Skip to content

Commit

Permalink
gcpmachinepoolmachines and reworking some of the main pool logic
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennenMM7 committed Apr 15, 2024
1 parent 97c5227 commit da5db9d
Show file tree
Hide file tree
Showing 22 changed files with 2,453 additions and 57 deletions.
3 changes: 3 additions & 0 deletions cloud/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ const (
// See https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/
// for annotation formatting rules.
CustomDataHashAnnotation = "sigs.k8s.io/cluster-api-provider-gcp-mig-custom-data-hash"

// ClusterAPIImagePrefix is the prefix for the image name used by the Cluster API provider for GCP.
ClusterAPIImagePrefix = "capi-ubuntu-1804-k8s-"
)
2 changes: 1 addition & 1 deletion cloud/scope/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (m *MachineScope) InstanceImageSpec() *compute.AttachedDisk {
if m.Machine.Spec.Version != nil {
version = *m.Machine.Spec.Version
}
image := "capi-ubuntu-1804-k8s-" + strings.ReplaceAll(semver.MajorMinor(version), ".", "-")
image := cloud.ClusterAPIImagePrefix + strings.ReplaceAll(semver.MajorMinor(version), ".", "-")
sourceImage := path.Join("projects", m.ClusterGetter.Project(), "global", "images", "family", image)
if m.GCPMachine.Spec.Image != nil {
sourceImage = *m.GCPMachine.Spec.Image
Expand Down
248 changes: 239 additions & 9 deletions cloud/scope/machinepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,30 @@ import (
"os"
"path"
"sort"
"strconv"
"strings"

"github.com/pkg/errors"
"golang.org/x/mod/semver"
"google.golang.org/api/compute/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
infrav1 "sigs.k8s.io/cluster-api-provider-gcp/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-gcp/cloud"
machinepool "sigs.k8s.io/cluster-api-provider-gcp/cloud/scope/strategies/machinepool_deployments"
infrav1exp "sigs.k8s.io/cluster-api-provider-gcp/exp/api/v1beta1"
"sigs.k8s.io/cluster-api-provider-gcp/util/processors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
capierrors "sigs.k8s.io/cluster-api/errors"
clusterv1exp "sigs.k8s.io/cluster-api/exp/api/v1beta1"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/labels/format"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -59,10 +64,11 @@ type (
Client client.Client
PatchHelper *patch.Helper
CapiMachinePoolPatchHelper *patch.Helper

ClusterGetter cloud.ClusterGetter
MachinePool *clusterv1exp.MachinePool
GCPMachinePool *infrav1exp.GCPMachinePool
ClusterGetter cloud.ClusterGetter
MachinePool *clusterv1exp.MachinePool
GCPMachinePool *infrav1exp.GCPMachinePool
migState *compute.InstanceGroupManager
migInstances []*compute.ManagedInstance
}
)

Expand Down Expand Up @@ -98,6 +104,16 @@ func NewMachinePoolScope(params MachinePoolScopeParams) (*MachinePoolScope, erro
}, nil
}

// SetMIGState updates the machine pool scope with the current state of the MIG.
func (m *MachinePoolScope) SetMIGState(migState *compute.InstanceGroupManager) {
m.migState = migState
}

// SetMIGInstances updates the machine pool scope with the current state of the MIG instances.
func (m *MachinePoolScope) SetMIGInstances(migInstances []*compute.ManagedInstance) {
m.migInstances = migInstances
}

// SetReady sets the GCPMachinePool Ready Status to true.
func (m *MachinePoolScope) SetReady() {
m.GCPMachinePool.Status.Ready = true
Expand Down Expand Up @@ -134,8 +150,19 @@ func (m *MachinePoolScope) PatchObject(ctx context.Context) error {

// Close closes the current scope persisting the cluster configuration and status.
func (m *MachinePoolScope) Close(ctx context.Context) error {
if m.migState != nil && m.migInstances != nil {
if err := m.applyGCPMachinePoolMachines(ctx); err != nil {
return errors.Wrap(err, "failed to apply GCPMachinePoolMachines")
}

m.setProvisioningStateAndConditions()
if err := m.updateReplicasAndProviderIDs(ctx); err != nil {
return errors.Wrap(err, "failed to update replicas and providerIDs")
}
}

if err := m.PatchObject(ctx); err != nil {
return err
return errors.Wrap(err, "failed to patch GCPMachinePool")
}
if err := m.PatchCAPIMachinePoolObject(ctx); err != nil {
return errors.Wrap(err, "unable to patch CAPI MachinePool")
Expand All @@ -144,6 +171,204 @@ func (m *MachinePoolScope) Close(ctx context.Context) error {
return nil
}

// updateReplicasAndProviderIDs updates the GCPMachinePool replicas and providerIDs.
func (m *MachinePoolScope) updateReplicasAndProviderIDs(ctx context.Context) error {
machines, err := m.GetMachinePoolMachines(ctx)
if err != nil {
return errors.Wrap(err, "failed to get machine pool machines")
}

var readyReplicas int32
providerIDs := make([]string, len(machines))
for i, machine := range machines {
if machine.Status.Ready {
readyReplicas++
}
providerIDs[i] = machine.Spec.ProviderID
}

m.GCPMachinePool.Status.Replicas = readyReplicas
m.GCPMachinePool.Spec.ProviderIDList = providerIDs
m.MachinePool.Spec.ProviderIDList = providerIDs
m.MachinePool.Status.Replicas = readyReplicas
return nil
}

// setProvisioningStateAndConditions sets the GCPMachinePool provisioning state and conditions.
func (m *MachinePoolScope) setProvisioningStateAndConditions() {
switch {
case *m.MachinePool.Spec.Replicas == m.GCPMachinePool.Status.Replicas:
// MIG is provisioned with enough ready replicas
m.SetReady()
conditions.MarkTrue(m.ConditionSetter(), infrav1exp.GCPMachinePoolReadyCondition)
conditions.MarkFalse(m.ConditionSetter(), infrav1exp.GCPMachinePoolCreatingCondition, infrav1exp.GCPMachinePoolUpdatedReason, clusterv1.ConditionSeverityInfo, "")
conditions.MarkFalse(m.ConditionSetter(), infrav1exp.GCPMachinePoolUpdatingCondition, infrav1exp.GCPMachinePoolUpdatedReason, clusterv1.ConditionSeverityInfo, "")
case *m.MachinePool.Spec.Replicas != m.GCPMachinePool.Status.Replicas:
// MIG is still provisioning
m.SetNotReady()
conditions.MarkFalse(m.ConditionSetter(), infrav1exp.GCPMachinePoolReadyCondition, infrav1exp.GCPMachinePoolCreatingReason, clusterv1.ConditionSeverityInfo, "")
conditions.MarkTrue(m.ConditionSetter(), infrav1exp.GCPMachinePoolUpdatingCondition)
default:
m.SetNotReady()
conditions.MarkFalse(m.ConditionSetter(), infrav1exp.GCPMachinePoolReadyCondition, infrav1exp.GCPMachinePoolCreatingReason, clusterv1.ConditionSeverityInfo, "")
conditions.MarkTrue(m.ConditionSetter(), infrav1exp.GCPMachinePoolUpdatingCondition)
}
}

func (m *MachinePoolScope) applyGCPMachinePoolMachines(ctx context.Context) error {
log := log.FromContext(ctx)

if m.migState == nil {
return nil
}

gmpms, err := m.GetMachinePoolMachines(ctx)
if err != nil {
return err
}

existingMachinesByProviderID := make(map[string]infrav1exp.GCPMachinePoolMachine, len(gmpms))
for _, machine := range gmpms {
existingMachinesByProviderID[machine.Spec.ProviderID] = machine
}

gcpMachinesByProviderID := m.InstancesByProviderID()
for key, val := range gcpMachinesByProviderID {
if _, ok := existingMachinesByProviderID[key]; !ok {
log.Info("Creating GCPMachinePoolMachine", "machine", val.Name)
if err := m.createMachine(ctx, val); err != nil {
return errors.Wrap(err, "failed creating GCPMachinePoolMachine")
}
continue
}
}

deleted := false
// delete machines that no longer exist in GCP
for key, machine := range existingMachinesByProviderID {
machine := machine
if _, ok := gcpMachinesByProviderID[key]; !ok {
deleted = true
log.V(4).Info("deleting GCPMachinePoolMachine because it no longer exists in the MIG", "providerID", key)
delete(existingMachinesByProviderID, key)
if err := m.Client.Delete(ctx, &machine); err != nil {
return errors.Wrap(err, "failed deleting GCPMachinePoolMachine no longer existing in GCP")
}
}
}

if deleted {
log.Info("GCPMachinePoolMachines deleted, requeueing")
return nil
}

// when replicas are externally managed, we do not want to scale down manually since that is handled by the external scaler.
if m.HasReplicasExternallyManaged(ctx) {
log.Info("Replicas are externally managed, skipping scaling down")
return nil
}

deleteSelector := m.getDeploymentStrategy()
if deleteSelector == nil {
log.V(4).Info("can not select GCPMachinePoolMachines to delete because no deployment strategy is specified")
return nil
}

// select machines to delete to lower the replica count
toDelete, err := deleteSelector.SelectMachinesToDelete(ctx, m.DesiredReplicas(), existingMachinesByProviderID)
if err != nil {
return errors.Wrap(err, "failed selecting GCPMachinePoolMachines to delete")
}

for _, machine := range toDelete {
machine := machine
log.Info("deleting selected GCPMachinePoolMachine", "providerID", machine.Spec.ProviderID)
if err := m.Client.Delete(ctx, &machine); err != nil {
return errors.Wrap(err, "failed deleting GCPMachinePoolMachine to reduce replica count")
}
}
return nil
}

func (m *MachinePoolScope) createMachine(ctx context.Context, managedInstance compute.ManagedInstance) error {
gmpm := infrav1exp.GCPMachinePoolMachine{
ObjectMeta: metav1.ObjectMeta{
Name: managedInstance.Name,
Namespace: m.GCPMachinePool.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: infrav1exp.GroupVersion.String(),
Kind: "GCPMachinePool",
Name: m.GCPMachinePool.Name,
BlockOwnerDeletion: ptr.To(true),
UID: m.GCPMachinePool.UID,
},
},
Labels: map[string]string{
m.ClusterGetter.Name(): string(infrav1.ResourceLifecycleOwned),
clusterv1.ClusterNameLabel: m.ClusterGetter.Name(),
infrav1exp.MachinePoolNameLabel: m.GCPMachinePool.Name,
clusterv1.MachinePoolNameLabel: format.MustFormatValue(m.MachinePool.Name),
},
},
Spec: infrav1exp.GCPMachinePoolMachineSpec{
ProviderID: m.ProviderIDInstance(&managedInstance),
InstanceID: strconv.FormatUint(managedInstance.Id, 10),
},
}

controllerutil.AddFinalizer(&gmpm, infrav1exp.GCPMachinePoolMachineFinalizer)
if err := m.Client.Create(ctx, &gmpm); err != nil {
return errors.Wrapf(err, "failed creating GCPMachinePoolMachine %s in GCPMachinePool %s", managedInstance.Name, m.GCPMachinePool.Name)
}

return nil
}

func (m *MachinePoolScope) getDeploymentStrategy() machinepool.TypedDeleteSelector {
if m.GCPMachinePool == nil {
return nil
}

return machinepool.NewMachinePoolDeploymentStrategy(m.GCPMachinePool.Spec.Strategy)
}

// GetMachinePoolMachines returns the list of GCPMachinePoolMachines associated with this GCPMachinePool.
func (m *MachinePoolScope) GetMachinePoolMachines(ctx context.Context) ([]infrav1exp.GCPMachinePoolMachine, error) {
labels := m.getMachinePoolMachineLabels()
gmpml := &infrav1exp.GCPMachinePoolMachineList{}
if err := m.Client.List(ctx, gmpml, client.InNamespace(m.GCPMachinePool.Namespace), client.MatchingLabels(labels)); err != nil {
return nil, errors.Wrap(err, "failed to list GCPMachinePoolMachines")
}

return gmpml.Items, nil
}

// DesiredReplicas returns the replica count on machine pool or 0 if machine pool replicas is nil.
func (m MachinePoolScope) DesiredReplicas() int32 {
return ptr.Deref(m.MachinePool.Spec.Replicas, 0)
}

// InstancesByProviderID returns a map of GCPMachinePoolMachine instances by providerID.
func (m *MachinePoolScope) InstancesByProviderID() map[string]compute.ManagedInstance {
instances := make(map[string]compute.ManagedInstance, len(m.migInstances))
for _, instance := range m.migInstances {
if instance.InstanceStatus == "RUNNING" {
instances[m.ProviderIDInstance(instance)] = *instance
}
}
return instances
}

func (m *MachinePoolScope) getMachinePoolMachineLabels() map[string]string {
return map[string]string{
clusterv1.ClusterNameLabel: m.ClusterGetter.Name(),
infrav1exp.MachinePoolNameLabel: m.GCPMachinePool.Name,
clusterv1.MachinePoolNameLabel: format.MustFormatValue(m.MachinePool.Name),
m.ClusterGetter.Name(): string(infrav1.ResourceLifecycleOwned),
}
}

// InstanceGroupTemplateBuilder returns a GCP instance template.
func (m *MachinePoolScope) InstanceGroupTemplateBuilder(bootstrapData string) *compute.InstanceTemplate {
instanceTemplate := &compute.InstanceTemplate{
Expand Down Expand Up @@ -250,7 +475,7 @@ func (m *MachinePoolScope) InstanceImageSpec() *compute.AttachedDisk {
if m.MachinePool.Spec.Template.Spec.Version != nil {
version = *m.MachinePool.Spec.Template.Spec.Version
}
image := "capi-ubuntu-1804-k8s-" + strings.ReplaceAll(semver.MajorMinor(version), ".", "-")
image := cloud.ClusterAPIImagePrefix + strings.ReplaceAll(semver.MajorMinor(version), ".", "-")
sourceImage := path.Join("projects", m.ClusterGetter.Project(), "global", "images", "family", image)
if m.GCPMachinePool.Spec.Image != nil {
sourceImage = *m.GCPMachinePool.Spec.Image
Expand Down Expand Up @@ -393,16 +618,21 @@ func (m *MachinePoolScope) SetAnnotation(key, value string) {
m.GCPMachinePool.Annotations[key] = value
}

// Namespace returns the GCPMachine namespace.
// Namespace returns the GCPMachinePool namespace.
func (m *MachinePoolScope) Namespace() string {
return m.MachinePool.Namespace
}

// Name returns the GCPMachine name.
// Name returns the GCPMachinePool name.
func (m *MachinePoolScope) Name() string {
return m.GCPMachinePool.Name
}

// ProviderIDInstance returns the GCPMachinePool providerID for a managed instance.
func (m *MachinePoolScope) ProviderIDInstance(managedInstance *compute.ManagedInstance) string {
return fmt.Sprintf("gce://%s/%s/%s", m.Project(), m.GCPMachinePool.Spec.Zone, managedInstance.Name)
}

// HasReplicasExternallyManaged returns true if the machine pool has replicas externally managed.
func (m *MachinePoolScope) HasReplicasExternallyManaged(_ context.Context) bool {
return annotations.ReplicasManagedByExternalAutoscaler(m.MachinePool)
Expand All @@ -421,7 +651,7 @@ func (m *MachinePoolScope) UpdateCAPIMachinePoolReplicas(_ context.Context, repl
m.MachinePool.Spec.Replicas = replicas
}

// ReconcileReplicas ensures MachinePool replicas match MIG capacity if replicas are externally managed by an autoscaler.
// ReconcileReplicas ensures MachinePool replicas match MIG capacity unless replicas are externally managed by an autoscaler.
func (m *MachinePoolScope) ReconcileReplicas(ctx context.Context, mig *compute.InstanceGroupManager) error {
log := log.FromContext(ctx)

Expand Down
Loading

0 comments on commit da5db9d

Please sign in to comment.