Skip to content

Commit

Permalink
Refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 committed Nov 14, 2024
1 parent 469acd2 commit 642184e
Showing 1 changed file with 78 additions and 58 deletions.
136 changes: 78 additions & 58 deletions internal/controller/metalstackmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ type MetalStackMachineReconciler struct {
Scheme *runtime.Scheme
}

type machineReconciler struct {
metalClient metalgo.Client
client client.Client
ctx context.Context
log logr.Logger
infraCluster *v1alpha1.MetalStackCluster
clusterMachine *clusterv1.Machine
infraMachine *v1alpha1.MetalStackMachine
}

// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=machines,verbs=get;list;watch
// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=metalstackmachines,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=metalstackmachines/status,verbs=get;update;patch
Expand Down Expand Up @@ -119,8 +129,18 @@ func (r *MetalStackMachineReconciler) Reconcile(ctx context.Context, req ctrl.Re
return ctrl.Result{}, err
}

reconciler := &machineReconciler{
metalClient: r.MetalClient,
client: r.Client,
ctx: ctx,
log: log,
infraCluster: infraCluster,
clusterMachine: machine,
infraMachine: infraMachine,
}

defer func() {
statusErr := r.status(ctx, infraCluster, machine, infraMachine)
statusErr := reconciler.status()
if statusErr != nil {
err = errors.Join(err, fmt.Errorf("unable to update status: %w", statusErr))
}
Expand All @@ -132,7 +152,7 @@ func (r *MetalStackMachineReconciler) Reconcile(ctx context.Context, req ctrl.Re
}

log.Info("reconciling resource deletion flow")
err := r.delete(ctx, log, infraCluster, infraMachine, machine)
err := reconciler.delete()
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -168,7 +188,7 @@ func (r *MetalStackMachineReconciler) Reconcile(ctx context.Context, req ctrl.Re
return ctrl.Result{}, errors.New("waiting until control plane ip was set to cluster spec")
}

err = r.reconcile(ctx, log, infraCluster, machine, infraMachine)
err = reconciler.reconcile()

return ctrl.Result{}, err // remember to return err here and not nil because the defer func can influence this
}
Expand All @@ -181,14 +201,14 @@ func (r *MetalStackMachineReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *MetalStackMachineReconciler) reconcile(ctx context.Context, log logr.Logger, infraCluster *v1alpha1.MetalStackCluster, clusterMachine *clusterv1.Machine, infraMachine *v1alpha1.MetalStackMachine) error {
m, err := r.findProviderMachine(ctx, infraCluster, infraMachine, util.IsControlPlaneMachine(clusterMachine))
func (r *machineReconciler) reconcile() error {
m, err := r.findProviderMachine()
if err != nil && !errors.Is(err, errProviderMachineNotFound) {
return err
}

if errors.Is(err, errProviderMachineNotFound) {
m, err = r.create(ctx, infraCluster, infraMachine, util.IsControlPlaneMachine(clusterMachine))
m, err = r.create()
if err != nil {
return fmt.Errorf("unable to create machine at provider: %w", err)
}
Expand All @@ -198,25 +218,25 @@ func (r *MetalStackMachineReconciler) reconcile(ctx context.Context, log logr.Lo
return errors.New("machine allocated but got no provider ID")
}

log.Info("setting provider id into machine resource")
r.log.Info("setting provider id into machine resource")

helper, err := patch.NewHelper(infraMachine, r.Client)
helper, err := patch.NewHelper(r.infraMachine, r.client)
if err != nil {
return err
}

infraMachine.Spec.ProviderID = *m.ID
r.infraMachine.Spec.ProviderID = *m.ID

err = helper.Patch(ctx, infraMachine) // TODO:check whether patch is not executed when no changes occur
err = helper.Patch(r.ctx, r.infraMachine) // TODO:check whether patch is not executed when no changes occur
if err != nil {
return fmt.Errorf("failed to update infra machine provider ID %q: %w", infraMachine.Spec.ProviderID, err)
return fmt.Errorf("failed to update infra machine provider ID %q: %w", r.infraMachine.Spec.ProviderID, err)
}

return nil
}

func (r *MetalStackMachineReconciler) delete(ctx context.Context, log logr.Logger, infraCluster *v1alpha1.MetalStackCluster, infraMachine *v1alpha1.MetalStackMachine, clusterMachine *clusterv1.Machine) error {
m, err := r.findProviderMachine(ctx, infraCluster, infraMachine, util.IsControlPlaneMachine(clusterMachine))
func (r *machineReconciler) delete() error {
m, err := r.findProviderMachine()
if errors.Is(err, errProviderMachineNotFound) {
// metal-stack machine already freed
return nil
Expand All @@ -225,31 +245,31 @@ func (r *MetalStackMachineReconciler) delete(ctx context.Context, log logr.Logge
return fmt.Errorf("failed to find provider machine: %w", err)
}

_, err = r.MetalClient.Machine().FreeMachine(metalmachine.NewFreeMachineParamsWithContext(ctx).WithID(*m.ID), nil)
_, err = r.metalClient.Machine().FreeMachine(metalmachine.NewFreeMachineParamsWithContext(r.ctx).WithID(*m.ID), nil)
if err != nil {
return fmt.Errorf("failed to delete provider machine: %w", err)
}

log.Info("freed provider machine")
r.log.Info("freed provider machine")

return nil
}

func (r *MetalStackMachineReconciler) create(ctx context.Context, infraCluster *v1alpha1.MetalStackCluster, infraMachine *v1alpha1.MetalStackMachine, isControlPlaneMachine bool) (*models.V1MachineResponse, error) {
func (r *machineReconciler) create() (*models.V1MachineResponse, error) {
var (
ips []string
nws = []*models.V1MachineAllocationNetwork{
{
Autoacquire: ptr.To(true),
Networkid: infraCluster.Status.NodeNetworkID,
Networkid: r.infraCluster.Status.NodeNetworkID,
},
}
)

if isControlPlaneMachine {
ips = append(ips, infraCluster.Spec.ControlPlaneEndpoint.Host)
if util.IsControlPlaneMachine(r.clusterMachine) {
ips = append(ips, r.infraCluster.Spec.ControlPlaneEndpoint.Host)

resp, err := r.MetalClient.IP().FindIP(ipmodels.NewFindIPParams().WithID(infraCluster.Spec.ControlPlaneEndpoint.Host).WithContext(ctx), nil)
resp, err := r.metalClient.IP().FindIP(ipmodels.NewFindIPParams().WithID(r.infraCluster.Spec.ControlPlaneEndpoint.Host).WithContext(r.ctx), nil)
if err != nil {
return nil, fmt.Errorf("unable to lookup control plane ip: %w", err)
}
Expand All @@ -260,16 +280,16 @@ func (r *MetalStackMachineReconciler) create(ctx context.Context, infraCluster *
})
}

resp, err := r.MetalClient.Machine().AllocateMachine(metalmachine.NewAllocateMachineParamsWithContext(ctx).WithBody(&models.V1MachineAllocateRequest{
Partitionid: &infraCluster.Spec.Partition,
Projectid: &infraCluster.Spec.ProjectID,
PlacementTags: []string{tag.New(tag.ClusterID, string(infraCluster.GetUID()))},
Tags: machineTags(infraCluster, infraMachine, isControlPlaneMachine),
Name: infraMachine.Name,
Hostname: infraMachine.Name,
Sizeid: &infraMachine.Spec.Size,
Imageid: &infraMachine.Spec.Image,
Description: fmt.Sprintf("%s/%s for cluster %s/%s", infraMachine.Namespace, infraMachine.Name, infraCluster.Namespace, infraCluster.Name),
resp, err := r.metalClient.Machine().AllocateMachine(metalmachine.NewAllocateMachineParamsWithContext(r.ctx).WithBody(&models.V1MachineAllocateRequest{
Partitionid: &r.infraCluster.Spec.Partition,
Projectid: &r.infraCluster.Spec.ProjectID,
PlacementTags: []string{tag.New(tag.ClusterID, string(r.infraCluster.GetUID()))},
Tags: r.machineTags(),
Name: r.infraMachine.Name,
Hostname: r.infraMachine.Name,
Sizeid: &r.infraMachine.Spec.Size,
Imageid: &r.infraMachine.Spec.Image,
Description: fmt.Sprintf("%s/%s for cluster %s/%s", r.infraMachine.Namespace, r.infraMachine.Name, r.infraCluster.Namespace, r.infraCluster.Name),
Networks: nws,
Ips: ips,
// TODO: UserData, SSHPubKeys, ...
Expand All @@ -281,14 +301,14 @@ func (r *MetalStackMachineReconciler) create(ctx context.Context, infraCluster *
return resp.Payload, nil
}

func (r *MetalStackMachineReconciler) status(ctx context.Context, infraCluster *v1alpha1.MetalStackCluster, clusterMachine *clusterv1.Machine, infraMachine *v1alpha1.MetalStackMachine) error {
func (r *machineReconciler) status() error {
var (
g, _ = errgroup.WithContext(ctx)
g, _ = errgroup.WithContext(r.ctx)
conditionUpdates = make(chan func())

// TODO: probably there is a helper for this available somewhere?
allConditionsTrue = func() bool {
for _, c := range infraMachine.Status.Conditions {
for _, c := range r.infraMachine.Status.Conditions {
if c.Status != corev1.ConditionTrue {
return false
}
Expand All @@ -303,21 +323,21 @@ func (r *MetalStackMachineReconciler) status(ctx context.Context, infraCluster *
}()

g.Go(func() error {
m, err := r.findProviderMachine(ctx, infraCluster, infraMachine, util.IsControlPlaneMachine(clusterMachine))
m, err := r.findProviderMachine()

conditionUpdates <- func() {
switch {
case err != nil && !errors.Is(err, errProviderMachineNotFound):
conditions.MarkFalse(infraMachine, v1alpha1.ProviderMachineCreated, "InternalError", clusterv1.ConditionSeverityError, "%s", err.Error())
conditions.MarkFalse(infraMachine, v1alpha1.ProviderMachineHealthy, "NotHealthy", clusterv1.ConditionSeverityWarning, "machine not created")
conditions.MarkFalse(r.infraMachine, v1alpha1.ProviderMachineCreated, "InternalError", clusterv1.ConditionSeverityError, "%s", err.Error())
conditions.MarkFalse(r.infraMachine, v1alpha1.ProviderMachineHealthy, "NotHealthy", clusterv1.ConditionSeverityWarning, "machine not created")
case err != nil && errors.Is(err, errProviderMachineNotFound):
conditions.MarkFalse(infraMachine, v1alpha1.ProviderMachineCreated, "NotCreated", clusterv1.ConditionSeverityError, "%s", err.Error())
conditions.MarkFalse(infraMachine, v1alpha1.ProviderMachineHealthy, "NotHealthy", clusterv1.ConditionSeverityWarning, "machine not created")
conditions.MarkFalse(r.infraMachine, v1alpha1.ProviderMachineCreated, "NotCreated", clusterv1.ConditionSeverityError, "%s", err.Error())
conditions.MarkFalse(r.infraMachine, v1alpha1.ProviderMachineHealthy, "NotHealthy", clusterv1.ConditionSeverityWarning, "machine not created")
default:
if infraMachine.Spec.ProviderID == *m.ID {
conditions.MarkTrue(infraMachine, v1alpha1.ProviderMachineCreated)
if r.infraMachine.Spec.ProviderID == *m.ID {
conditions.MarkTrue(r.infraMachine, v1alpha1.ProviderMachineCreated)
} else {
conditions.MarkFalse(infraMachine, v1alpha1.ProviderMachineCreated, "NotSet", clusterv1.ConditionSeverityWarning, "provider id was not yet patched into the machine's spec")
conditions.MarkFalse(r.infraMachine, v1alpha1.ProviderMachineCreated, "NotSet", clusterv1.ConditionSeverityWarning, "provider id was not yet patched into the machine's spec")
}

var errs []error
Expand All @@ -338,15 +358,15 @@ func (r *MetalStackMachineReconciler) status(ctx context.Context, infraCluster *
}

if len(errs) == 0 {
conditions.MarkTrue(infraMachine, v1alpha1.ProviderMachineHealthy)
conditions.MarkTrue(r.infraMachine, v1alpha1.ProviderMachineHealthy)
} else {
conditions.MarkFalse(infraMachine, v1alpha1.ProviderMachineHealthy, "NotHealthy", clusterv1.ConditionSeverityWarning, "%s", errors.Join(errs...).Error())
conditions.MarkFalse(r.infraMachine, v1alpha1.ProviderMachineHealthy, "NotHealthy", clusterv1.ConditionSeverityWarning, "%s", errors.Join(errs...).Error())
}

infraMachine.Status.Addresses = nil
r.infraMachine.Status.Addresses = nil

if m.Allocation.Hostname != nil {
infraMachine.Status.Addresses = append(infraMachine.Status.Addresses, clusterv1.MachineAddress{
r.infraMachine.Status.Addresses = append(r.infraMachine.Status.Addresses, clusterv1.MachineAddress{
Type: clusterv1.MachineHostName,
Address: *m.Allocation.Hostname,
})
Expand All @@ -356,14 +376,14 @@ func (r *MetalStackMachineReconciler) status(ctx context.Context, infraCluster *
switch ptr.Deref(nw.Networktype, "") {
case "privateprimaryunshared":
for _, ip := range nw.Ips {
infraMachine.Status.Addresses = append(infraMachine.Status.Addresses, clusterv1.MachineAddress{
r.infraMachine.Status.Addresses = append(r.infraMachine.Status.Addresses, clusterv1.MachineAddress{
Type: clusterv1.MachineInternalIP,
Address: ip,
})
}
case "external":
for _, ip := range nw.Ips {
infraMachine.Status.Addresses = append(infraMachine.Status.Addresses, clusterv1.MachineAddress{
r.infraMachine.Status.Addresses = append(r.infraMachine.Status.Addresses, clusterv1.MachineAddress{
Type: clusterv1.MachineExternalIP,
Address: ip,
})
Expand All @@ -384,22 +404,22 @@ func (r *MetalStackMachineReconciler) status(ctx context.Context, infraCluster *

groupErr := g.Wait()
if groupErr == nil && allConditionsTrue() {
infraMachine.Status.Ready = true
r.infraMachine.Status.Ready = true
}

err := r.Client.Status().Update(ctx, infraMachine)
err := r.client.Status().Update(r.ctx, r.infraMachine)

return errors.Join(groupErr, err)
}

func (r *MetalStackMachineReconciler) findProviderMachine(ctx context.Context, infraCluster *v1alpha1.MetalStackCluster, infraMachine *v1alpha1.MetalStackMachine, isControlPlaneMachine bool) (*models.V1MachineResponse, error) {
func (r *machineReconciler) findProviderMachine() (*models.V1MachineResponse, error) {
mfr := &models.V1MachineFindRequest{
ID: infraMachine.Spec.ProviderID,
AllocationProject: infraCluster.Spec.ProjectID,
Tags: machineTags(infraCluster, infraMachine, isControlPlaneMachine),
ID: r.infraMachine.Spec.ProviderID,
AllocationProject: r.infraCluster.Spec.ProjectID,
Tags: r.machineTags(),
}

resp, err := r.MetalClient.Machine().FindMachines(metalmachine.NewFindMachinesParamsWithContext(ctx).WithBody(mfr), nil)
resp, err := r.metalClient.Machine().FindMachines(metalmachine.NewFindMachinesParamsWithContext(r.ctx).WithBody(mfr), nil)
if err != nil {
return nil, err
}
Expand All @@ -415,13 +435,13 @@ func (r *MetalStackMachineReconciler) findProviderMachine(ctx context.Context, i
}
}

func machineTags(infraCluster *v1alpha1.MetalStackCluster, infraMachine *v1alpha1.MetalStackMachine, isControlPlaneMachine bool) []string {
func (r *machineReconciler) machineTags() []string {
tags := []string{
tag.New(tag.ClusterID, string(infraCluster.GetUID())),
tag.New(v1alpha1.TagInfraMachineID, string(infraMachine.GetUID())),
tag.New(tag.ClusterID, string(r.infraCluster.GetUID())),
tag.New(v1alpha1.TagInfraMachineID, string(r.infraMachine.GetUID())),
}

if isControlPlaneMachine {
if util.IsControlPlaneMachine(r.clusterMachine) {
tags = append(tags, v1alpha1.TagControlPlanePurpose)
}

Expand Down

0 comments on commit 642184e

Please sign in to comment.