Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a paused flag to pause the reconciliation of the AerospikeCluster #302

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList"
// +kubebuilder:validation:MinItems:=1
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
// Paused flag is used to pause the reconciliation for the AerospikeCluster.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Pause Reconcile"
Paused *bool `json:"paused,omitempty"`
abhishekdwivedi3060 marked this conversation as resolved.
Show resolved Hide resolved
// Operations is a list of on-demand operations to be performed on the Aerospike cluster.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Operations"
// +kubebuilder:validation:MaxItems:=1
Expand Down
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ spec:
list by the operator
type: string
type: object
paused:
description: Paused flag is used to pause the reconciliation for the
AerospikeCluster.
type: boolean
podSpec:
description: Specify additional configuration for the Aerospike pods
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ spec:
- description: Certificates to connect to Aerospike.
displayName: Operator Client Cert
path: operatorClientCert
- description: Paused flag is used to pause the reconciliation for the AerospikeCluster.
displayName: Pause Reconcile
path: paused
- description: Specify additional configuration for the Aerospike pods
displayName: Pod Configuration
path: podSpec
Expand Down
7 changes: 7 additions & 0 deletions controllers/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,13 @@ func (r *SingleClusterReconciler) handleNSOrDeviceRemovalForIgnorablePods(
func (r *SingleClusterReconciler) reconcileRack(
found *appsv1.StatefulSet, rackState *RackState, ignorablePodNames sets.Set[string], failedPods []*corev1.Pod,
) reconcileResult {
if asdbv1.GetBool(r.aeroCluster.Spec.Paused) {
// This check is not strictly necessary here. It is already checked in the parent reconcile function.
// But, it is added here to avoid unnecessary reconciliation of rack when reconcileRack is called in a loop.
r.Log.Info("Reconciliation is paused for this AerospikeCluster")
return reconcileRequeueAfter(1)
}

r.Log.Info(
"Reconcile existing Aerospike cluster statefulset", "stsName",
found.Name,
Expand Down
17 changes: 14 additions & 3 deletions controllers/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error)
return reconcile.Result{}, nil
}

// Pause the reconciliation for the AerospikeCluster if the paused field is set to true.
// Deletion of the AerospikeCluster will not be paused.
if asdbv1.GetBool(r.aeroCluster.Spec.Paused) {
r.Log.Info("Reconciliation is paused for this AerospikeCluster")
return reconcile.Result{}, nil
}

// Set the status to AerospikeClusterInProgress before starting any operations
if err := r.setStatusPhase(asdbv1.AerospikeClusterInProgress); err != nil {
return reconcile.Result{}, err
Expand Down Expand Up @@ -301,8 +308,10 @@ func (r *SingleClusterReconciler) recoverIgnorablePods() reconcileResult {
return reconcileSuccess()
}

func (r *SingleClusterReconciler) validateAndReconcileAccessControl(selectedPods []corev1.Pod,
ignorablePodNames sets.Set[string]) error {
func (r *SingleClusterReconciler) validateAndReconcileAccessControl(
selectedPods []corev1.Pod,
ignorablePodNames sets.Set[string],
) error {
version, err := asdbv1.GetImageVersion(r.aeroCluster.Spec.Image)
if err != nil {
return err
Expand Down Expand Up @@ -959,7 +968,9 @@ func (r *SingleClusterReconciler) migrateInitialisedVolumeNames(ctx context.Cont
}

// Appending volume name as <vol_name>@<pvcUID> in initializedVolumes list
initializedVolumes = append(initializedVolumes, fmt.Sprintf("%s@%s", oldFormatInitVolNames[oldVolIdx], pvcUID))
initializedVolumes = append(
initializedVolumes, fmt.Sprintf("%s@%s", oldFormatInitVolNames[oldVolIdx], pvcUID),
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ spec:
list by the operator
type: string
type: object
paused:
description: Paused flag is used to pause the reconciliation for the
AerospikeCluster.
type: boolean
podSpec:
description: Specify additional configuration for the Aerospike pods
properties:
Expand Down
218 changes: 158 additions & 60 deletions test/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,78 @@ var _ = Describe(
UpdateClusterPre600(ctx)
},
)
Context(
"PauseReconcile", func() {
PauseReconcileTest(ctx)
},
)
},
)

func PauseReconcileTest(ctx goctx.Context) {
clusterNamespacedName := getNamespacedName(
"pause-reconcile", namespace,
)

BeforeEach(
func() {
aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 2)
err := deployCluster(k8sClient, ctx, aeroCluster)
Expect(err).ToNot(HaveOccurred())
},
)

AfterEach(
func() {
aeroCluster, err := getCluster(
k8sClient, ctx, clusterNamespacedName,
)
Expect(err).ToNot(HaveOccurred())

_ = deleteCluster(k8sClient, ctx, aeroCluster)
},
)

It(
"Should pause reconcile", func() {
// Pause reconcile and then apply operation
// Testing over upgrade as it is a long-running operation
By("Pause reconcile")
err := setPauseFlag(ctx, clusterNamespacedName, ptr.To(true))
Expect(err).ToNot(HaveOccurred())

By("Start upgrade, it should fail")
err = upgradeClusterTest(k8sClient, ctx, clusterNamespacedName, nextImage)
Expect(err).To(HaveOccurred())

By("Resume reconcile")
err = setPauseFlag(ctx, clusterNamespacedName, nil)
Expect(err).ToNot(HaveOccurred())

By("Upgrade should succeed")
aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

err = waitForAerospikeCluster(
k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval,
getTimeout(2), []asdbv1.AerospikeClusterPhase{asdbv1.AerospikeClusterCompleted},
)
Expect(err).ToNot(HaveOccurred())
},
)
}

func setPauseFlag(ctx goctx.Context, clusterNamespacedName types.NamespacedName, pause *bool) error {
aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName)
if err != nil {
return err
}

aeroCluster.Spec.Paused = pause

return updateCluster(k8sClient, ctx, aeroCluster)
}

func UpdateClusterPre600(ctx goctx.Context) {
Context(
"UpdateClusterPre600", func() {
Expand Down Expand Up @@ -199,7 +268,8 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) {
nodeList = &v1.NodeList{}
podList = &v1.PodList{}
expectedPhases = []asdbv1.AerospikeClusterPhase{
asdbv1.AerospikeClusterInProgress, asdbv1.AerospikeClusterCompleted}
asdbv1.AerospikeClusterInProgress, asdbv1.AerospikeClusterCompleted,
}
)

clusterNamespacedName := getNamespacedName(
Expand Down Expand Up @@ -238,27 +308,31 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) {

// As pod is in pending state, CR object will be updated continuously
// This is put in eventually to retry Object Conflict error
Eventually(func() error {
aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())
val := intstr.FromInt32(1)
aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val
aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["enable-quotas"] = true

// As pod is in pending state, CR object won't reach the final phase.
// So expectedPhases can be InProgress or Completed
return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases)
}, 1*time.Minute).ShouldNot(HaveOccurred())
Eventually(
func() error {
aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())
val := intstr.FromInt32(1)
aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val
aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["enable-quotas"] = true

// As pod is in pending state, CR object won't reach the final phase.
// So expectedPhases can be InProgress or Completed
return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases)
}, 1*time.Minute,
).ShouldNot(HaveOccurred())

By("Upgrade version")
Eventually(func() error {
aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())
aeroCluster.Spec.Image = nextImage
// As pod is in pending state, CR object won't reach the final phase.
// So expectedPhases can be InProgress or Completed
return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases)
}, 1*time.Minute).ShouldNot(HaveOccurred())
Eventually(
func() error {
aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())
aeroCluster.Spec.Image = nextImage
// As pod is in pending state, CR object won't reach the final phase.
// So expectedPhases can be InProgress or Completed
return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases)
}, 1*time.Minute,
).ShouldNot(HaveOccurred())

By("Verify pending pod")
podList, err = getPodList(aeroCluster, k8sClient)
Expand All @@ -274,21 +348,23 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) {
Expect(counter).To(Equal(1))

By("Executing on-demand operation")
Eventually(func() error {
aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

operations := []asdbv1.OperationSpec{
{
Kind: asdbv1.OperationWarmRestart,
ID: "1",
},
}
aeroCluster.Spec.Operations = operations
// As pod is in pending state, CR object won't reach the final phase.
// So expectedPhases can be InProgress or Completed
return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases)
}, 1*time.Minute).ShouldNot(HaveOccurred())
Eventually(
func() error {
aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName)
Expect(err).ToNot(HaveOccurred())

operations := []asdbv1.OperationSpec{
{
Kind: asdbv1.OperationWarmRestart,
ID: "1",
},
}
aeroCluster.Spec.Operations = operations
// As pod is in pending state, CR object won't reach the final phase.
// So expectedPhases can be InProgress or Completed
return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases)
}, 1*time.Minute,
).ShouldNot(HaveOccurred())

By("Verify pending pod")
podList, err = getPodList(aeroCluster, k8sClient)
Expand Down Expand Up @@ -342,8 +418,12 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) {
ignorePodName := clusterNamespacedName.Name + "-1-1"
pod := &v1.Pod{}

err := k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName,
Namespace: clusterNamespacedName.Namespace}, pod)
err := k8sClient.Get(
ctx, types.NamespacedName{
Name: ignorePodName,
Namespace: clusterNamespacedName.Namespace,
}, pod,
)
Expect(err).ToNot(HaveOccurred())

pod.Spec.Containers[0].Image = wrongImage
Expand All @@ -361,19 +441,29 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) {
Expect(err).ToNot(HaveOccurred())

By(fmt.Sprintf("Verify if failed pod %s is automatically recovered", ignorePodName))
Eventually(func() bool {
err = k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName,
Namespace: clusterNamespacedName.Namespace}, pod)

return len(pod.Status.ContainerStatuses) != 0 && *pod.Status.ContainerStatuses[0].Started &&
pod.Status.ContainerStatuses[0].Ready
}, 1*time.Minute).Should(BeTrue())

Eventually(func() error {
return InterceptGomegaFailure(func() {
validateRoster(k8sClient, ctx, clusterNamespacedName, scNamespace)
})
}, 4*time.Minute).Should(BeNil())
Eventually(
func() bool {
err = k8sClient.Get(
ctx, types.NamespacedName{
Name: ignorePodName,
Namespace: clusterNamespacedName.Namespace,
}, pod,
)

return len(pod.Status.ContainerStatuses) != 0 && *pod.Status.ContainerStatuses[0].Started &&
pod.Status.ContainerStatuses[0].Ready
}, 1*time.Minute,
).Should(BeTrue())

Eventually(
func() error {
return InterceptGomegaFailure(
func() {
validateRoster(k8sClient, ctx, clusterNamespacedName, scNamespace)
},
)
}, 4*time.Minute,
).Should(BeNil())
},
)

Expand All @@ -383,8 +473,12 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) {
ignorePodName := clusterNamespacedName.Name + "-1-1"
pod := &v1.Pod{}

err := k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName,
Namespace: clusterNamespacedName.Namespace}, pod)
err := k8sClient.Get(
ctx, types.NamespacedName{
Name: ignorePodName,
Namespace: clusterNamespacedName.Namespace,
}, pod,
)
Expect(err).ToNot(HaveOccurred())

pod.Spec.Containers[0].Image = wrongImage
Expand Down Expand Up @@ -430,7 +524,8 @@ func deployClusterForMaxIgnorablePods(ctx goctx.Context, clusterNamespacedName t
nsList = append(nsList, getNonSCNamespaceConfig("bar", "/test/dev/xvdf1"))
aeroCluster.Spec.AerospikeConfig.Value["namespaces"] = nsList

aeroCluster.Spec.Storage.Volumes = append(aeroCluster.Spec.Storage.Volumes,
aeroCluster.Spec.Storage.Volumes = append(
aeroCluster.Spec.Storage.Volumes,
asdbv1.VolumeSpec{
Name: "bar",
Source: asdbv1.VolumeSource{
Expand All @@ -447,7 +542,8 @@ func deployClusterForMaxIgnorablePods(ctx goctx.Context, clusterNamespacedName t
)
racks := getDummyRackConf(1, 2)
aeroCluster.Spec.RackConfig = asdbv1.RackConfig{
Namespaces: []string{scNamespace}, Racks: racks}
Namespaces: []string{scNamespace}, Racks: racks,
}
aeroCluster.Spec.PodSpec.MultiPodPerHost = ptr.To(false)
err := deployCluster(k8sClient, ctx, aeroCluster)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -702,12 +798,14 @@ func UpdateTLSClusterTest(ctx goctx.Context) {

network := aeroCluster.Spec.AerospikeConfig.Value["network"].(map[string]interface{})
tlsList := network["tls"].([]interface{})
tlsList = append(tlsList, map[string]interface{}{
"name": "aerospike-a-0.test-runner1",
"cert-file": "/etc/aerospike/secret/svc_cluster_chain.pem",
"key-file": "/etc/aerospike/secret/svc_key.pem",
"ca-file": "/etc/aerospike/secret/cacert.pem",
})
tlsList = append(
tlsList, map[string]interface{}{
"name": "aerospike-a-0.test-runner1",
"cert-file": "/etc/aerospike/secret/svc_cluster_chain.pem",
"key-file": "/etc/aerospike/secret/svc_key.pem",
"ca-file": "/etc/aerospike/secret/cacert.pem",
},
)
network["tls"] = tlsList
aeroCluster.Spec.AerospikeConfig.Value["network"] = network
err = updateCluster(k8sClient, ctx, aeroCluster)
Expand Down
Loading
Loading