Skip to content

Commit

Permalink
1. Validates Cloudstack template name and kubernetes version from the…
Browse files Browse the repository at this point in the history
… controller side and waits for both to match for further reconciliation.

2. Since we have seperated ETCD and CP reconciliation, it waits for ETCD to be ready before marking ControlPlaneReady to true.
3. From the CLI side during management cluster upgrade, waits for Cluster FailureMessage to be nil before waiting for other condition to met.
  • Loading branch information
panktishah26 committed Sep 27, 2023
1 parent a62abf3 commit 439e7a3
Show file tree
Hide file tree
Showing 22 changed files with 409 additions and 53 deletions.
4 changes: 2 additions & 2 deletions pkg/cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func (c *Config) ChildObjects() []kubernetes.Object {
// ClusterAndChildren returns all kubernetes objects in the cluster Config.
// It's equivalent to appending the Cluster to the result of ChildObjects.
func (c *Config) ClusterAndChildren() []kubernetes.Object {
objs := c.ChildObjects()
return append(objs, c.Cluster)
objs := []kubernetes.Object{c.Cluster}
return append(objs, c.ChildObjects()...)
}

func appendIfNotNil(objs []kubernetes.Object, elems ...kubernetes.Object) []kubernetes.Object {
Expand Down
31 changes: 21 additions & 10 deletions pkg/cluster/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,26 @@ import (
// or the retrier timeouts. If observedGeneration is not equal to generation,
// the condition is considered false regardless of the status value.
func WaitForCondition(ctx context.Context, client kubernetes.Reader, cluster *anywherev1.Cluster, retrier *retrier.Retrier, conditionType anywherev1.ConditionType) error {
return WaitFor(ctx, client, cluster, retrier, func(c *anywherev1.Cluster) error {
condition := conditions.Get(c, conditionType)
if condition == nil {
return fmt.Errorf("cluster doesn't yet have condition %s", conditionType)
}

if condition.Status != corev1.ConditionTrue {
return fmt.Errorf("cluster condition %s is %s: %s", conditionType, condition.Status, condition.Message)
}
return nil
})
}

// Matcher matches the given condition.
type Matcher func(*anywherev1.Cluster) error

// WaitFor gets the cluster object from the client
// checks for generation and observedGeneration condition
// matches condition and returns error if the condition is not met.
func WaitFor(ctx context.Context, client kubernetes.Reader, cluster *anywherev1.Cluster, retrier *retrier.Retrier, matcher Matcher) error {
return retrier.Retry(func() error {
c := &anywherev1.Cluster{}

Expand All @@ -35,15 +55,6 @@ func WaitForCondition(ctx context.Context, client kubernetes.Reader, cluster *an
return fmt.Errorf("cluster generation (%d) and observedGeneration (%d) differ", generation, observedGeneration)
}

condition := conditions.Get(c, conditionType)
if condition == nil {
return fmt.Errorf("cluster doesn't yet have condition %s", conditionType)
}

if condition.Status != corev1.ConditionTrue {
return fmt.Errorf("cluster condition %s is %s: %s", conditionType, condition.Status, condition.Message)
}

return nil
return matcher(c)
})
}
110 changes: 110 additions & 0 deletions pkg/cluster/wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster_test

import (
"context"
"fmt"
"testing"

. "github.com/onsi/gomega"
Expand Down Expand Up @@ -153,3 +154,112 @@ func TestWaitForCondition(t *testing.T) {
})
}
}

func TestWaitFor(t *testing.T) {
testCases := []struct {
name string
clusterInput, currentCluster *anywherev1.Cluster
retrier *retrier.Retrier
matcher func(_ *anywherev1.Cluster) error
wantErr string
}{
{
name: "cluster does not exist",
clusterInput: &anywherev1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "my-c",
Namespace: "my-n",
},
},
currentCluster: &anywherev1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "my-c",
Namespace: "default",
},
},
retrier: retrier.NewWithMaxRetries(1, 0),
matcher: func(_ *anywherev1.Cluster) error {
return nil
},
wantErr: "clusters.anywhere.eks.amazonaws.com \"my-c\" not found",
},
{
name: "cluster namespace not provided",
clusterInput: &anywherev1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "my-c",
},
},
currentCluster: &anywherev1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "my-c",
Namespace: "eksa-namespace",
},
},
retrier: retrier.NewWithMaxRetries(1, 0),
matcher: func(_ *anywherev1.Cluster) error {
return nil
},
wantErr: "clusters.anywhere.eks.amazonaws.com \"my-c\" not found",
},
{
name: "observed generation not updated",
clusterInput: &anywherev1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "my-c",
Namespace: "my-n",
},
},
currentCluster: &anywherev1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "my-c",
Namespace: "my-n",
Generation: 5,
},
Status: anywherev1.ClusterStatus{
ObservedGeneration: 4,
},
},
retrier: retrier.NewWithMaxRetries(1, 0),
matcher: func(_ *anywherev1.Cluster) error {
return nil
},
wantErr: "cluster generation (5) and observedGeneration (4) differ",
},
{
name: "matcher return error",
clusterInput: &anywherev1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "my-c",
Namespace: "my-n",
},
},
currentCluster: &anywherev1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: "my-c",
Namespace: "my-n",
},
},
retrier: retrier.NewWithMaxRetries(1, 0),
matcher: func(_ *anywherev1.Cluster) error {
return fmt.Errorf("error")
},
wantErr: "error",
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
g := NewWithT(t)
client := test.NewFakeKubeClient(tt.currentCluster)

gotErr := cluster.WaitFor(ctx, client, tt.clusterInput, tt.retrier, tt.matcher)
if tt.wantErr != "" {
g.Expect(gotErr).To(MatchError(tt.wantErr))
} else {
g.Expect(gotErr).NotTo(HaveOccurred())
}
})
}
}
10 changes: 10 additions & 0 deletions pkg/clustermanager/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clustermanager

import (
"context"
"fmt"
"math"
"time"

Expand Down Expand Up @@ -129,6 +130,15 @@ func (a Applier) Run(ctx context.Context, spec *cluster.Spec, managementCluster
waitStartTime := time.Now()
retry := a.retrierForWait(waitStartTime)

if err := cluster.WaitFor(ctx, client, spec.Cluster, retrier.New(5*time.Second), func(c *anywherev1.Cluster) error {
if c.Status.FailureMessage != nil && *c.Status.FailureMessage != "" {
return fmt.Errorf("cluster has an error: %s", *c.Status.FailureMessage)
}
return nil
}); err != nil {
return fmt.Errorf("cluster has a validation error that doesn't seem transient: %s", err)
}

a.log.V(3).Info("Waiting for control plane to be ready")
if err := cluster.WaitForCondition(ctx, client, spec.Cluster, retry, anywherev1.ControlPlaneReadyCondition); err != nil {
return errors.Wrapf(err, "waiting for cluster's control plane to be ready")
Expand Down
17 changes: 17 additions & 0 deletions pkg/clustermanager/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (a *applierTest) buildClient(objs ...kubernetes.Object) {
a.clientFactory.EXPECT().BuildClientFromKubeconfig(a.mgmtCluster.KubeconfigFile).Return(a.client, nil)
}

func (a *applierTest) updateFailureMessage(c *anywherev1.Cluster, err string) {
c.Status.FailureMessage = ptr.String(err)
a.Expect(a.client.Update(a.ctx, c)).To(Succeed())
}

func (a *applierTest) markCPReady(c *anywherev1.Cluster) {
conditions.MarkTrue(c, anywherev1.ControlPlaneReadyCondition)
a.Expect(a.client.Update(a.ctx, c)).To(Succeed())
Expand Down Expand Up @@ -173,6 +178,18 @@ func TestApplierRunErrorApplying(t *testing.T) {
tt.Expect(a.Run(tt.ctx, tt.spec, tt.mgmtCluster)).To(MatchError(ContainSubstring("applying cluster spec")))
}

func TestApplierRunFailureMessage(t *testing.T) {
tt := newApplierTest(t)
tt.buildClient(tt.spec.ClusterAndChildren()...)
tt.updateFailureMessage(tt.spec.Cluster, "error")
tt.startFakeController()
a := clustermanager.NewApplier(tt.log, tt.clientFactory,
clustermanager.WithApplierRetryBackOff(time.Millisecond),
)

tt.Expect(a.Run(tt.ctx, tt.spec, tt.mgmtCluster)).To(MatchError(ContainSubstring("cluster has a validation error that doesn't seem transient")))
}

func TestApplierRunControlPlaneNotReady(t *testing.T) {
tt := newApplierTest(t)
tt.buildClient()
Expand Down
26 changes: 25 additions & 1 deletion pkg/controller/clusters/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clusters
import (
"context"

etcdv1 "github.com/aws/etcdadm-controller/api/v1beta1"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
Expand All @@ -22,8 +23,21 @@ func UpdateClusterStatusForControlPlane(ctx context.Context, client client.Clien
return errors.Wrapf(err, "getting kubeadmcontrolplane")
}

var etcdadmCluster *etcdv1.EtcdadmCluster
if cluster.Spec.ExternalEtcdConfiguration != nil {
capiCluster, err := controller.GetCAPICluster(ctx, client, cluster)
if err != nil {
return errors.Wrap(err, "getting capi cluster")
}

etcdadmCluster, err = getEtcdadmCluster(ctx, client, capiCluster)
if err != nil {
return errors.Wrap(err, "reading etcdadm cluster")
}
}

updateControlPlaneInitializedCondition(cluster, kcp)
updateControlPlaneReadyCondition(cluster, kcp)
updateConditionsForEtcdAndControlPlane(cluster, kcp, etcdadmCluster)

return nil
}
Expand Down Expand Up @@ -68,6 +82,16 @@ func UpdateClusterStatusForCNI(ctx context.Context, cluster *anywherev1.Cluster)
}
}

// updateConditionsForEtcdAndControlPlane updates the ControlPlaneReady condition if etcdadm cluster is not ready.
func updateConditionsForEtcdAndControlPlane(cluster *anywherev1.Cluster, kcp *controlplanev1.KubeadmControlPlane, etcdadmCluster *etcdv1.EtcdadmCluster) {
// Make sure etcd cluster is ready before marking ControlPlaneReady status to true
if cluster.Spec.ExternalEtcdConfiguration != nil && !etcdadmClusterReady(etcdadmCluster) {
conditions.MarkFalse(cluster, anywherev1.ControlPlaneReadyCondition, anywherev1.RollingUpgradeInProgress, clusterv1.ConditionSeverityInfo, "Etcd is not ready")
return
}
updateControlPlaneReadyCondition(cluster, kcp)
}

// updateControlPlaneReadyCondition updates the ControlPlaneReady condition, after checking the state of the control plane
// in the cluster.
func updateControlPlaneReadyCondition(cluster *anywherev1.Cluster, kcp *controlplanev1.KubeadmControlPlane) {
Expand Down
Loading

0 comments on commit 439e7a3

Please sign in to comment.