Skip to content

Commit

Permalink
feat: Adds autoscaling logic for new Chain and Schedule policies (#3929)
Browse files Browse the repository at this point in the history
* Add application logic for Schedule and Chain Policy within the autoscaler
* Add schedule and chain policy tests and add calculation for cronStart and cronEnd times
* Remove unnecessary mustParseDate calls within validation test
* Flesh out tests for autoscaling/schedule logic
  • Loading branch information
indexjoseph authored Aug 7, 2024
1 parent 049de2f commit ff4c222
Show file tree
Hide file tree
Showing 5 changed files with 546 additions and 21 deletions.
10 changes: 3 additions & 7 deletions pkg/apis/autoscaling/v1/fleetautoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,13 +581,9 @@ func (c *ChainPolicy) ValidateChainPolicy(fldPath *field.Path) field.ErrorList {
seenIDs[entry.ID] = true
}
// Ensure that chain entry has a policy
hasValidPolicy := entry.Buffer == nil && entry.Webhook == nil && entry.Counter == nil && entry.List == nil && entry.Schedule == nil
if entry.Type == "" || hasValidPolicy {
allErrs = append(allErrs, field.Required(fldPath.Index(i), "policy is missing"))
}
// Ensure the chain entry's policy is not a chain policy (to avoid nested chain policies)
if entry.Chain != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Index(i), entry.FleetAutoscalerPolicy.Type, "chain policy cannot be used in chain policy"))
hasValidPolicy := entry.Buffer != nil || entry.Webhook != nil || entry.Counter != nil || entry.List != nil || entry.Schedule != nil
if entry.Type == "" || !hasValidPolicy {
allErrs = append(allErrs, field.Required(fldPath.Index(i), "valid policy is missing"))
}
// Validate the chain entry's policy
allErrs = append(allErrs, entry.FleetAutoscalerPolicy.ValidatePolicy(fldPath.Index(i).Child("policy"))...)
Expand Down
4 changes: 1 addition & 3 deletions pkg/apis/autoscaling/v1/fleetautoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,6 @@ func TestFleetAutoscalerScheduleValidateUpdate(t *testing.T) {
},
"end time before start time": {
fas: modifiedFAS(func(fap *FleetAutoscalerPolicy) {
mustParseDate("3999-06-15T15:59:59Z")
mustParseDate("3999-05-15T15:59:59Z")
fap.Schedule.Between.Start = mustParseDate("3999-06-15T15:59:59Z")
fap.Schedule.Between.End = mustParseDate("3999-05-15T15:59:59Z")
}),
Expand Down Expand Up @@ -588,7 +586,7 @@ func TestFleetAutoscalerChainValidateUpdate(t *testing.T) {
}
}),
featureFlags: string(runtime.FeatureScheduledAutoscaler) + "=true",
wantLength: 2,
wantLength: 1,
wantField: "spec.policy.chain[1]",
},
"invalid nested policy format": {
Expand Down
5 changes: 3 additions & 2 deletions pkg/fleetautoscalers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,9 @@ func (c *Controller) syncFleetAutoscaler(ctx context.Context, key string) error
}

currentReplicas := fleet.Status.Replicas
desiredReplicas, scalingLimited, err := computeDesiredFleetSize(fas, fleet, c.gameServerLister, c.counter.Counts())
if err != nil {
desiredReplicas, scalingLimited, err := computeDesiredFleetSize(fas.Spec.Policy, fleet, c.gameServerLister, c.counter.Counts())
// If there err is nil and not an inactive schedule error (ignorable in this case), then record the event
if err != nil && !errors.Is(err, InactiveScheduleError{}) {
c.recorder.Eventf(fas, corev1.EventTypeWarning, "FleetAutoscaler",
"Error calculating desired fleet size on FleetAutoscaler %s. Error: %s", fas.ObjectMeta.Name, err.Error())

Expand Down
130 changes: 124 additions & 6 deletions pkg/fleetautoscalers/fleetautoscalers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"

Expand All @@ -41,6 +42,8 @@ import (
"agones.dev/agones/pkg/util/runtime"
)

const maxDuration = "2540400h" // 290 Years

var tlsConfig = &tls.Config{}
var client = http.Client{
Timeout: 15 * time.Second,
Expand All @@ -49,18 +52,29 @@ var client = http.Client{
},
}

// InactiveScheduleError denotes an error for schedules that are not currently active.
type InactiveScheduleError struct{}

func (InactiveScheduleError) Error() string {
return "inactive schedule, policy not applicable"
}

// computeDesiredFleetSize computes the new desired size of the given fleet
func computeDesiredFleetSize(fas *autoscalingv1.FleetAutoscaler, f *agonesv1.Fleet,
func computeDesiredFleetSize(pol autoscalingv1.FleetAutoscalerPolicy, f *agonesv1.Fleet,
gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount) (int32, bool, error) {
switch fas.Spec.Policy.Type {
switch pol.Type {
case autoscalingv1.BufferPolicyType:
return applyBufferPolicy(fas.Spec.Policy.Buffer, f)
return applyBufferPolicy(pol.Buffer, f)
case autoscalingv1.WebhookPolicyType:
return applyWebhookPolicy(fas.Spec.Policy.Webhook, f)
return applyWebhookPolicy(pol.Webhook, f)
case autoscalingv1.CounterPolicyType:
return applyCounterOrListPolicy(fas.Spec.Policy.Counter, nil, f, gameServerLister, nodeCounts)
return applyCounterOrListPolicy(pol.Counter, nil, f, gameServerLister, nodeCounts)
case autoscalingv1.ListPolicyType:
return applyCounterOrListPolicy(nil, fas.Spec.Policy.List, f, gameServerLister, nodeCounts)
return applyCounterOrListPolicy(nil, pol.List, f, gameServerLister, nodeCounts)
case autoscalingv1.SchedulePolicyType:
return applySchedulePolicy(pol.Schedule, f, gameServerLister, nodeCounts, time.Now())
case autoscalingv1.ChainPolicyType:
return applyChainPolicy(pol.Chain, f, gameServerLister, nodeCounts, time.Now())
}

return 0, false, errors.New("wrong policy type, should be one of: Buffer, Webhook, Counter, List")
Expand Down Expand Up @@ -362,6 +376,110 @@ func applyCounterOrListPolicy(c *autoscalingv1.CounterPolicy, l *autoscalingv1.L
return 0, false, errors.Errorf("unable to apply ListPolicy %v", l)
}

func applySchedulePolicy(s *autoscalingv1.SchedulePolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount, currentTime time.Time) (int32, bool, error) {
// Ensure the scheduled autoscaler feature gate is enabled
if !runtime.FeatureEnabled(runtime.FeatureScheduledAutoscaler) {
return 0, false, errors.Errorf("cannot apply SchedulePolicy unless feature flag %s is enabled", runtime.FeatureScheduledAutoscaler)
}

if isScheduleActive(s, currentTime) {
return computeDesiredFleetSize(s.Policy, f, gameServerLister, nodeCounts)
}

// If the schedule wasn't active then return the current replica amount of the fleet
return f.Status.Replicas, false, &InactiveScheduleError{}
}

func applyChainPolicy(c autoscalingv1.ChainPolicy, f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister, nodeCounts map[string]gameservers.NodeCount, currentTime time.Time) (int32, bool, error) {
// Ensure the scheduled autoscaler feature gate is enabled
if !runtime.FeatureEnabled(runtime.FeatureScheduledAutoscaler) {
return 0, false, errors.Errorf("cannot apply ChainPolicy unless feature flag %s is enabled", runtime.FeatureScheduledAutoscaler)
}

replicas := f.Status.Replicas
var limited bool
var err error

// Loop over all entries in the chain
for _, entry := range c {
switch entry.Type {
case autoscalingv1.SchedulePolicyType:
replicas, limited, err = applySchedulePolicy(entry.Schedule, f, gameServerLister, nodeCounts, currentTime)
// If no error was returned from the schedule policy (schedule is active and/or webhook policy within schedule was successful), then return the values given
if err == nil {
return replicas, limited, nil
}
case autoscalingv1.WebhookPolicyType:
replicas, limited, err = applyWebhookPolicy(entry.Webhook, f)
// If no error was returned from the webhook policy, then return the values given
if err == nil {
return replicas, limited, nil
}
default:
// Every other policy type we just want to compute the desired fleet and return it
return computeDesiredFleetSize(entry.FleetAutoscalerPolicy, f, gameServerLister, nodeCounts)
}

}

// Fall off the chain
return replicas, limited, err
}

// isScheduleActive checks if a chain entry's is active and returns a boolean, true if active, false otherwise
func isScheduleActive(s *autoscalingv1.SchedulePolicy, currentTime time.Time) bool {
// Used for checking ahead of the schedule for daylight savings purposes
cronDelta := (time.Minute * -1) + (time.Second * -30)

// If the current time is before the start time, the schedule is inactive so return false
startTime := s.Between.Start.Time
if currentTime.Before(startTime) {
return false
}

// If an end time is present and the current time is after the end time, the schedule is inactive so return false
endTime := s.Between.End.Time
if !endTime.IsZero() && currentTime.After(endTime) {
return false
}

// If no startCron field is specified, then it's automatically true (duration is no longer relevant since we're always running)
if s.ActivePeriod.StartCron == "" {
return true
}

// Ignore the error as validation is already done within the validateChainPolicy after being unmarshalled
location, _ := time.LoadLocation(s.ActivePeriod.Timezone)

// Ignore the error as validation is already done within the validateChainPolicy after being unmarshalled
startCron, _ := cron.ParseStandard(s.ActivePeriod.StartCron)

// Ignore the error as validation is already done within the validateChainPolicy after being unmarshalled.
// If the duration is empty set it to the largest duration possible (290 years)
duration, _ := time.ParseDuration(s.ActivePeriod.Duration)
if s.ActivePeriod.Duration == "" {
duration, _ = time.ParseDuration(maxDuration)
}

// Get the current time - duration
currentTimeMinusDuration := currentTime.Add(duration * -1)
// Take (current time - duration) to get the first available start time
cronStartTime := startCron.Next(currentTimeMinusDuration.In(location))
// Take the (cronStartTime + duration) to get the end time
cronEndTime := cronStartTime.Add(duration)

// If the current time is after the cronStartTime - 90 seconds (for daylight saving purposes) AND the current time before the cronEndTime
// then return true
// Example: startCron = 0 14 * * * // 2:00 PM Everyday | duration = 1 hr | cronDelta = 90 seconds | currentTime = 2024-08-01T14:30:00Z | currentTimeMinusDuration = 2024-08-01T13:30:00Z
// then cronStartTime = 2024-08-01T14:00:00Z and cronEndTime = 2024-08-01T15:00:00Z
// and since currentTime > cronStartTime + cronDelta AND currentTime < cronEndTime, we return true
if currentTime.After(cronStartTime.Add(cronDelta)) && currentTime.Before(cronEndTime) {
return true
}

return false
}

// getSortedGameServers returns the list of Game Servers for the Fleet in the order in which the
// Game Servers would be deleted.
func getSortedGameServers(f *agonesv1.Fleet, gameServerLister listeragonesv1.GameServerLister,
Expand Down
Loading

0 comments on commit ff4c222

Please sign in to comment.