diff --git a/Dockerfile b/Dockerfile index 46cff61df..66072d160 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM --platform=$BUILDPLATFORM golang:1.21 as builder +FROM --platform=$BUILDPLATFORM golang:1.21 AS builder # OS and Arch args ARG TARGETOS diff --git a/Jenkinsfile b/Jenkinsfile index 95b2201b8..a34b36e10 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -80,8 +80,8 @@ pipeline { sh "./snyk-linux test --severity-threshold=high --fail-on=all" // Scan the operator images - sh "./snyk-linux container test ${OPERATOR_CONTAINER_IMAGE_CANDIDATE_NAME} --severity-threshold=high --file=Dockerfile --policy-path=.snyk --fail-on=all" - sh "./snyk-linux container test ${OPERATOR_BUNDLE_IMAGE_CANDIDATE_NAME} --severity-threshold=high --file=Dockerfile --policy-path=.snyk --fail-on=all" + sh "./snyk-linux container test ${OPERATOR_CONTAINER_IMAGE_CANDIDATE_NAME} --severity-threshold=high --file=Dockerfile --policy-path=.snyk --fail-on=all" + sh "./snyk-linux container test ${OPERATOR_BUNDLE_IMAGE_CANDIDATE_NAME} --severity-threshold=high --file=Dockerfile --policy-path=.snyk --fail-on=all" } } } diff --git a/Makefile b/Makefile index d4181b9f5..132ef14bc 100644 --- a/Makefile +++ b/Makefile @@ -140,7 +140,7 @@ go-lint: golanci-lint ## Run golangci-lint against code. .PHONY: test test: manifests generate fmt vet envtest ## Run tests. # KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" cd $(shell pwd)/test; go run github.com/onsi/ginkgo/v2/ginkgo -coverprofile cover.out -progress -v -timeout=12h0m0s -focus=${FOCUS} --junit-report="junit.xml" -- ${ARGS} + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" cd $(shell pwd)/test; go run github.com/onsi/ginkgo/v2/ginkgo -coverprofile cover.out -show-node-events -v -timeout=12h0m0s -focus=${FOCUS} --junit-report="junit.xml" -- ${ARGS} ##@ Build diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index d9427c54a..7d7e08856 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -124,6 +124,34 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList" // +kubebuilder:validation:MinItems:=1 K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"` + // Paused flag is used to pause the reconciliation for the AerospikeCluster. + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Pause Reconcile" + Paused *bool `json:"paused,omitempty"` + // Operations is a list of on-demand operations to be performed on the Aerospike cluster. + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Operations" + // +kubebuilder:validation:MaxItems:=1 + Operations []OperationSpec `json:"operations,omitempty"` +} + +type OperationKind string + +const ( + // OperationWarmRestart is the on-demand operation that leads to the warm restart of the aerospike pods + // (Restarting ASD in the pods). https://aerospike.com/docs/cloud/kubernetes/operator/Warm-restart + OperationWarmRestart OperationKind = "WarmRestart" + + // OperationPodRestart is the on-demand operation that leads to the restart of aerospike pods. + OperationPodRestart OperationKind = "PodRestart" +) + +type OperationSpec struct { + // Kind is the type of operation to be performed on the Aerospike cluster. + // +kubebuilder:validation:Enum=WarmRestart;PodRestart + Kind OperationKind `json:"kind"` + // +kubebuilder:validation:MaxLength=20 + // +kubebuilder:validation:MinLength=1 + ID string `json:"id"` + PodList []string `json:"podList,omitempty"` } type SeedsFinderServices struct { @@ -699,6 +727,8 @@ type AerospikeClusterStatusSpec struct { //nolint:govet // for readability RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"` // K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"` + // Operations is a list of on-demand operation to be performed on the Aerospike cluster. + Operations []OperationSpec `json:"operations,omitempty"` } // AerospikeClusterStatus defines the observed state of AerospikeCluster @@ -1046,6 +1076,12 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec, status.K8sNodeBlockList = *k8sNodeBlockList } + if len(spec.Operations) != 0 { + operations := lib.DeepCopy(&spec.Operations).(*[]OperationSpec) + + status.Operations = *operations + } + return &status, nil } @@ -1145,5 +1181,10 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec spec.K8sNodeBlockList = *k8sNodeBlockList } + if len(status.Operations) != 0 { + operations := lib.DeepCopy(&status.Operations).(*[]OperationSpec) + spec.Operations = *operations + } + return &spec, nil } diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index 1e060cd16..5c08d6af7 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -117,6 +117,12 @@ func (c *AerospikeCluster) ValidateUpdate(oldObj runtime.Object) (admission.Warn return nil, err } + if err := validateOperationUpdate( + &old.Spec, &c.Spec, &c.Status, + ); err != nil { + return nil, err + } + // Validate AerospikeConfig update if err := validateAerospikeConfigUpdate( aslog, incomingVersion, outgoingVersion, @@ -191,6 +197,10 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error { return err } + if err := c.validateOperation(); err != nil { + return err + } + // Storage should be validated before validating aerospikeConfig and fileStorage if err := validateStorage(&c.Spec.Storage, &c.Spec.PodSpec); err != nil { return err @@ -263,6 +273,19 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error { return c.validateSCNamespaces() } +func (c *AerospikeCluster) validateOperation() error { + // Nothing to validate if no operation + if len(c.Spec.Operations) == 0 { + return nil + } + + if c.Status.AerospikeConfig == nil { + return fmt.Errorf("operation cannot be added during aerospike cluster creation") + } + + return nil +} + func (c *AerospikeCluster) validateSCNamespaces() error { scNamespaceSet := sets.NewString() @@ -1292,20 +1315,22 @@ func validateSecurityConfigUpdate( func validateEnableSecurityConfig(newConfSpec, oldConfSpec *AerospikeConfigSpec) error { newConf := newConfSpec.Value oldConf := oldConfSpec.Value + oldSec, oldSecConfFound := oldConf["security"] - newSec, newSecConfFound := newConf["security"] + if !oldSecConfFound { + return nil + } - if oldSecConfFound && !newSecConfFound { + newSec, newSecConfFound := newConf["security"] + if !newSecConfFound { return fmt.Errorf("cannot remove cluster security config") } - if oldSecConfFound && newSecConfFound { - oldSecFlag, oldEnableSecurityFlagFound := oldSec.(map[string]interface{})["enable-security"] - newSecFlag, newEnableSecurityFlagFound := newSec.(map[string]interface{})["enable-security"] + oldSecFlag, oldEnableSecurityFlagFound := oldSec.(map[string]interface{})["enable-security"] + newSecFlag, newEnableSecurityFlagFound := newSec.(map[string]interface{})["enable-security"] - if oldEnableSecurityFlagFound && oldSecFlag.(bool) && (!newEnableSecurityFlagFound || !newSecFlag.(bool)) { - return fmt.Errorf("cannot disable cluster security in running cluster") - } + if oldEnableSecurityFlagFound && oldSecFlag.(bool) && (!newEnableSecurityFlagFound || !newSecFlag.(bool)) { + return fmt.Errorf("cannot disable cluster security in running cluster") } return nil @@ -2360,33 +2385,46 @@ func (c *AerospikeCluster) validateEnableDynamicConfigUpdate() error { return nil } -func getMinRunningInitVersion(pods map[string]AerospikePodStatus) (string, error) { - minVersion := "" +func validateOperationUpdate(oldSpec, newSpec *AerospikeClusterSpec, status *AerospikeClusterStatus) error { + if len(newSpec.Operations) == 0 { + return nil + } - for idx := range pods { - if pods[idx].InitImage != "" { - version, err := GetImageVersion(pods[idx].InitImage) - if err != nil { - return "", err - } + newOp := &newSpec.Operations[0] - if minVersion == "" { - minVersion = version - continue - } + var oldOp *OperationSpec - val, err := lib.CompareVersions(version, minVersion) - if err != nil { - return "", fmt.Errorf("failed to check image version: %v", err) - } + if len(oldSpec.Operations) != 0 { + oldOp = &oldSpec.Operations[0] + } - if val < 0 { - minVersion = version - } - } else { - return baseInitVersion, nil + if oldOp != nil && oldOp.ID == newOp.ID && !reflect.DeepEqual(oldOp, newOp) { + return fmt.Errorf("operation %s cannot be updated", newOp.ID) + } + + allPodNames := GetAllPodNames(status.Pods) + + podSet := sets.New(newSpec.Operations[0].PodList...) + if !allPodNames.IsSuperset(podSet) { + return fmt.Errorf("invalid pod names in operation %v", podSet.Difference(allPodNames).UnsortedList()) + } + + // Don't allow any on-demand operation along with these cluster change: + // 1- scale up + // 2- racks added or removed + // 3- image update + // New pods won't be available for operation + if !reflect.DeepEqual(newSpec.Operations, status.Operations) { + switch { + case newSpec.Size > status.Size: + return fmt.Errorf("cannot change Spec.Operations along with cluster scale-up") + case len(newSpec.RackConfig.Racks) != len(status.RackConfig.Racks) || + len(newSpec.RackConfig.Racks) != len(oldSpec.RackConfig.Racks): + return fmt.Errorf("cannot change Spec.Operations along with rack addition/removal") + case newSpec.Image != status.Image || newSpec.Image != oldSpec.Image: + return fmt.Errorf("cannot change Spec.Operations along with image update") } } - return minVersion, nil + return nil } diff --git a/api/v1/utils.go b/api/v1/utils.go index 833b67b0a..ce14db874 100644 --- a/api/v1/utils.go +++ b/api/v1/utils.go @@ -7,6 +7,8 @@ import ( "regexp" "strings" + "k8s.io/apimachinery/pkg/util/sets" + v1 "k8s.io/api/core/v1" "k8s.io/utils/ptr" @@ -536,3 +538,62 @@ func GetDefaultPasswordFilePath(aerospikeConfigSpec *AerospikeConfigSpec) *strin return &passFile } + +func getMinRunningInitVersion(pods map[string]AerospikePodStatus) (string, error) { + minVersion := "" + + for idx := range pods { + if pods[idx].InitImage != "" { + version, err := GetImageVersion(pods[idx].InitImage) + if err != nil { + return "", err + } + + if minVersion == "" { + minVersion = version + continue + } + + val, err := lib.CompareVersions(version, minVersion) + if err != nil { + return "", fmt.Errorf("failed to check image version: %v", err) + } + + if val < 0 { + minVersion = version + } + } else { + return baseInitVersion, nil + } + } + + return minVersion, nil +} + +func DistributeItems(totalItems, totalGroups int) []int { + itemsPerGroup, extraItems := totalItems/totalGroups, totalItems%totalGroups + + // Distributing nodes in given racks + var topology []int + + for groupIdx := 0; groupIdx < totalGroups; groupIdx++ { + itemsForThisGroup := itemsPerGroup + if groupIdx < extraItems { + itemsForThisGroup++ + } + + topology = append(topology, itemsForThisGroup) + } + + return topology +} + +func GetAllPodNames(pods map[string]AerospikePodStatus) sets.Set[string] { + podNames := make(sets.Set[string]) + + for podName := range pods { + podNames.Insert(podName) + } + + return podNames +} diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 41e05a81f..8a1a8f646 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -203,6 +203,18 @@ func (in *AerospikeClusterSpec) DeepCopyInto(out *AerospikeClusterSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.Paused != nil { + in, out := &in.Paused, &out.Paused + *out = new(bool) + **out = **in + } + if in.Operations != nil { + in, out := &in.Operations, &out.Operations + *out = make([]OperationSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AerospikeClusterSpec. @@ -300,6 +312,13 @@ func (in *AerospikeClusterStatusSpec) DeepCopyInto(out *AerospikeClusterStatusSp *out = make([]string, len(*in)) copy(*out, *in) } + if in.Operations != nil { + in, out := &in.Operations, &out.Operations + *out = make([]OperationSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AerospikeClusterStatusSpec. @@ -824,6 +843,26 @@ func (in *MountOptions) DeepCopy() *MountOptions { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OperationSpec) DeepCopyInto(out *OperationSpec) { + *out = *in + if in.PodList != nil { + in, out := &in.PodList, &out.PodList + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OperationSpec. +func (in *OperationSpec) DeepCopy() *OperationSpec { + if in == nil { + return nil + } + out := new(OperationSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PersistentVolumeSpec) DeepCopyInto(out *PersistentVolumeSpec) { *out = *in diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index 2f9447451..b46b57772 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -312,6 +312,32 @@ spec: This value is used to create PodDisruptionBudget. Defaults to 1. Refer Aerospike documentation for more details. x-kubernetes-int-or-string: true + operations: + description: Operations is a list of on-demand operations to be performed + on the Aerospike cluster. + items: + properties: + id: + maxLength: 20 + minLength: 1 + type: string + kind: + description: Kind is the type of operation to be performed on + the Aerospike cluster. + enum: + - WarmRestart + - PodRestart + type: string + podList: + items: + type: string + type: array + required: + - id + - kind + type: object + maxItems: 1 + type: array operatorClientCert: description: Certificates to connect to Aerospike. properties: @@ -356,6 +382,10 @@ spec: list by the operator type: string type: object + paused: + description: Paused flag is used to pause the reconciliation for the + AerospikeCluster. + type: boolean podSpec: description: Specify additional configuration for the Aerospike pods properties: @@ -9651,6 +9681,31 @@ spec: is the port requested by the user. Deprecated: MultiPodPerHost is now part of podSpec" type: boolean + operations: + description: Operations is a list of on-demand operation to be performed + on the Aerospike cluster. + items: + properties: + id: + maxLength: 20 + minLength: 1 + type: string + kind: + description: Kind is the type of operation to be performed on + the Aerospike cluster. + enum: + - WarmRestart + - PodRestart + type: string + podList: + items: + type: string + type: array + required: + - id + - kind + type: object + type: array operatorClientCertSpec: description: Certificates to connect to Aerospike. If omitted then certs are taken from the secret 'aerospike-secret'. diff --git a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml index 95b650fb0..e88283472 100644 --- a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml @@ -71,9 +71,16 @@ spec: for more details. displayName: Max Unavailable path: maxUnavailable + - description: Operations is a list of on-demand operations to be performed + on the Aerospike cluster. + displayName: Operations + path: operations - description: Certificates to connect to Aerospike. displayName: Operator Client Cert path: operatorClientCert + - description: Paused flag is used to pause the reconciliation for the AerospikeCluster. + displayName: Pause Reconcile + path: paused - description: Specify additional configuration for the Aerospike pods displayName: Pod Configuration path: podSpec diff --git a/controllers/pod.go b/controllers/pod.go index 1eee1214f..05451e987 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net" + "reflect" "strconv" "strings" "time" @@ -81,6 +82,9 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap(rackState *RackState, blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) requiredConfHash := confMap.Data[aerospikeConfHashFileName] + // Fetching all pods requested for on-demand operations. + onDemandQuickRestarts, onDemandPodRestarts := r.podsToRestart() + for idx := range pods { if ignorablePodNames.Has(pods[idx].Name) { continue @@ -136,7 +140,7 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap(rackState *RackState, } restartTypeMap[pods[idx].Name] = r.getRollingRestartTypePod(rackState, pods[idx], confMap, addedNSDevices, - len(dynamicConfDiffPerPod[pods[idx].Name]) > 0) + len(dynamicConfDiffPerPod[pods[idx].Name]) > 0, onDemandQuickRestarts, onDemandPodRestarts) // Fallback to rolling restart in case of partial failure to recover with the desired Aerospike config if podStatus.DynamicConfigUpdateStatus == asdbv1.PartiallyFailed { @@ -150,6 +154,7 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap(rackState *RackState, func (r *SingleClusterReconciler) getRollingRestartTypePod( rackState *RackState, pod *corev1.Pod, confMap *corev1.ConfigMap, addedNSDevices []string, onlyDynamicConfigChange bool, + onDemandQuickRestarts, onDemandPodRestarts sets.Set[string], ) RestartType { restartType := noRestart @@ -221,6 +226,13 @@ func (r *SingleClusterReconciler) getRollingRestartTypePod( "newTCPPort", r.getReadinessProbe().TCPSocket.String()) } + if opType := r.onDemandOperationType(pod.Name, onDemandQuickRestarts, onDemandPodRestarts); opType != noRestart { + restartType = mergeRestartType(restartType, opType) + + r.Log.Info("Pod warm/cold restart requested. Need rolling restart", + "pod name", pod.Name, "operation", opType, "restartType", restartType) + } + return restartType } @@ -358,6 +370,8 @@ func (r *SingleClusterReconciler) restartPods( } restartedPods := make([]*corev1.Pod, 0, len(podsToRestart)) + restartedPodNames := make([]string, 0, len(podsToRestart)) + restartedASDPodNames := make([]string, 0, len(podsToRestart)) blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) for idx := range podsToRestart { @@ -366,33 +380,43 @@ func (r *SingleClusterReconciler) restartPods( restartType := restartTypeMap[pod.Name] if restartType == quickRestart { - // If ASD restart fails, then go ahead and restart the pod - if err := r.restartASDOrUpdateAerospikeConf(pod.Name, quickRestart); err == nil { - continue + // We assume that the pod server image supports pod warm restart. + if err := r.restartASDOrUpdateAerospikeConf(pod.Name, quickRestart); err != nil { + r.Log.Error(err, "Failed to warm restart pod", "podName", pod.Name) + return reconcileError(err) } - } - if blockedK8sNodes.Has(pod.Spec.NodeName) { - r.Log.Info("Pod found in blocked nodes list, deleting corresponding local PVCs if any", - "podName", pod.Name) + restartedASDPodNames = append(restartedASDPodNames, pod.Name) + } else if restartType == podRestart { + if blockedK8sNodes.Has(pod.Spec.NodeName) { + r.Log.Info("Pod found in blocked nodes list, deleting corresponding local PVCs if any", + "podName", pod.Name) - if err := r.deleteLocalPVCs(rackState, pod); err != nil { + if err := r.deleteLocalPVCs(rackState, pod); err != nil { + return reconcileError(err) + } + } + + if err := r.Client.Delete(context.TODO(), pod); err != nil { + r.Log.Error(err, "Failed to delete pod") return reconcileError(err) } - } - if err := r.Client.Delete(context.TODO(), pod); err != nil { - r.Log.Error(err, "Failed to delete pod") - return reconcileError(err) - } + restartedPods = append(restartedPods, pod) + restartedPodNames = append(restartedPodNames, pod.Name) - restartedPods = append(restartedPods, pod) + r.Log.V(1).Info("Pod deleted", "podName", pod.Name) + } + } - r.Log.V(1).Info("Pod deleted", "podName", pod.Name) + if err := r.updateOperationStatus(restartedASDPodNames, restartedPodNames); err != nil { + return reconcileError(err) } if len(restartedPods) > 0 { - return r.ensurePodsRunningAndReady(restartedPods) + if result := r.ensurePodsRunningAndReady(restartedPods); !result.isSuccess { + return result + } } return reconcileSuccess() @@ -1466,3 +1490,182 @@ func (r *SingleClusterReconciler) patchPodStatus(ctx context.Context, patches [] return nil }) } + +func (r *SingleClusterReconciler) onDemandOperationType(podName string, onDemandQuickRestarts, + onDemandPodRestarts sets.Set[string]) RestartType { + switch { + case onDemandQuickRestarts.Has(podName): + return quickRestart + case onDemandPodRestarts.Has(podName): + return podRestart + } + + return noRestart +} + +func (r *SingleClusterReconciler) updateOperationStatus(restartedASDPodNames, restartedPodNames []string) error { + if len(restartedASDPodNames)+len(restartedPodNames) == 0 || len(r.aeroCluster.Spec.Operations) == 0 { + return nil + } + + statusOps := lib.DeepCopy(r.aeroCluster.Status.Operations).([]asdbv1.OperationSpec) + + allPodNames := asdbv1.GetAllPodNames(r.aeroCluster.Status.Pods) + + quickRestartsSet := sets.New(restartedASDPodNames...) + podRestartsSet := sets.New(restartedPodNames...) + + specOp := r.aeroCluster.Spec.Operations[0] + + var specPods sets.Set[string] + + // If no pod list is provided, it indicates that all pods need to be restarted. + if len(specOp.PodList) == 0 { + specPods = allPodNames + } else { + specPods = sets.New(specOp.PodList...) + } + + opFound := false + + for idx := range statusOps { + statusOp := &statusOps[idx] + if statusOp.ID == specOp.ID { + opFound = true + + if len(statusOp.PodList) != 0 { + statusPods := sets.New(statusOp.PodList...) + + if statusOp.Kind == asdbv1.OperationWarmRestart { + if quickRestartsSet.Len() > 0 { + statusOp.PodList = statusPods.Union(quickRestartsSet.Intersection(specPods)).UnsortedList() + } + + // If the operation is a warm restart and the pod undergoes a cold restart for any reason, + // we will still consider the warm restart operation as completed for that pod. + if podRestartsSet.Len() > 0 { + statusOp.PodList = statusPods.Union(podRestartsSet.Intersection(specPods)).UnsortedList() + } + } + + if statusOp.Kind == asdbv1.OperationPodRestart && podRestartsSet != nil { + statusOp.PodList = statusPods.Union(podRestartsSet.Intersection(specPods)).UnsortedList() + } + } + + break + } + } + + if !opFound { + var podList []string + + if specOp.Kind == asdbv1.OperationWarmRestart { + if quickRestartsSet.Len() > 0 { + podList = quickRestartsSet.Intersection(specPods).UnsortedList() + } + + // If the operation is a warm restart and the pod undergoes a cold restart for any reason, + // we will still consider the warm restart operation as completed for that pod. + if podRestartsSet.Len() > 0 { + podList = append(podList, podRestartsSet.Intersection(specPods).UnsortedList()...) + } + } + + if specOp.Kind == asdbv1.OperationPodRestart && podRestartsSet != nil { + podList = podRestartsSet.Intersection(specPods).UnsortedList() + } + + statusOps = append(statusOps, asdbv1.OperationSpec{ + ID: specOp.ID, + Kind: specOp.Kind, + PodList: podList, + }) + } + + // Get the old object, it may have been updated in between. + newAeroCluster := &asdbv1.AerospikeCluster{} + if err := r.Client.Get( + context.TODO(), types.NamespacedName{ + Name: r.aeroCluster.Name, Namespace: r.aeroCluster.Namespace, + }, newAeroCluster, + ); err != nil { + return err + } + + newAeroCluster.Status.Operations = statusOps + + if err := r.patchStatus(newAeroCluster); err != nil { + return fmt.Errorf("error updating status: %w", err) + } + + return nil +} + +// podsToRestart returns the pods that need to be restarted(quick/pod restart) based on the on-demand operations. +func (r *SingleClusterReconciler) podsToRestart() (quickRestarts, podRestarts sets.Set[string]) { + quickRestarts = make(sets.Set[string]) + podRestarts = make(sets.Set[string]) + + specOps := r.aeroCluster.Spec.Operations + statusOps := r.aeroCluster.Status.Operations + allPodNames := asdbv1.GetAllPodNames(r.aeroCluster.Status.Pods) + + // If no spec operations, no pods to restart + // If the Spec.Operations and Status.Operations are equal, no pods to restart. + if len(specOps) == 0 || reflect.DeepEqual(specOps, statusOps) { + return quickRestarts, podRestarts + } + + // Assuming only one operation is present in the spec. + specOp := specOps[0] + + var ( + podsToRestart, specPods sets.Set[string] + ) + // If no pod list is provided, it indicates that all pods need to be restarted. + if len(specOp.PodList) == 0 { + specPods = allPodNames + } else { + specPods = sets.New(specOp.PodList...) + } + + opFound := false + + // If the operation is not present in the status, all pods need to be restarted. + // If the operation is present in the status, only the pods that are not present in the status need to be restarted. + // If the operation is present in the status and podList is empty, no pods need to be restarted. + for _, statusOp := range statusOps { + if statusOp.ID != specOp.ID { + continue + } + + var statusPods sets.Set[string] + if len(statusOp.PodList) == 0 { + statusPods = allPodNames + } else { + statusPods = sets.New(statusOp.PodList...) + } + + podsToRestart = specPods.Difference(statusPods) + opFound = true + + break + } + + if !opFound { + podsToRestart = specPods + } + + // Separate pods to be restarted based on operation type + if podsToRestart != nil && podsToRestart.Len() > 0 { + switch specOp.Kind { + case asdbv1.OperationWarmRestart: + quickRestarts.Insert(podsToRestart.UnsortedList()...) + case asdbv1.OperationPodRestart: + podRestarts.Insert(podsToRestart.UnsortedList()...) + } + } + + return quickRestarts, podRestarts +} diff --git a/controllers/rack.go b/controllers/rack.go index c8306759f..d15ae8917 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -86,7 +86,9 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { // remove ignorable pods from failedPods failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { - r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) + r.Log.Info( + "Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods), + ) if res = r.reconcileRack( found, state, ignorablePodNames, failedPods, @@ -94,7 +96,9 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { return res } - r.Log.Info("Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) + r.Log.Info( + "Reconciled the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods), + ) } // 2. Again, fetch the pods for the rack and if there are failed pods then restart them. @@ -114,14 +118,20 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { // remove ignorable pods from failedPods failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { - r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) + r.Log.Info( + "Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods), + ) - if _, res = r.rollingRestartRack(found, state, ignorablePodNames, nil, - failedPods); !res.isSuccess { + if _, res = r.rollingRestartRack( + found, state, ignorablePodNames, nil, + failedPods, + ); !res.isSuccess { return res } - r.Log.Info("Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) + r.Log.Info( + "Restarted the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", getPodNames(failedPods), + ) // Requeue after 1 second to fetch latest CR object with updated pod status return reconcileRequeueAfter(1) } @@ -351,8 +361,10 @@ func (r *SingleClusterReconciler) deleteRacks( return reconcileSuccess() } -func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.StatefulSet, rackState *RackState, - ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { +func (r *SingleClusterReconciler) upgradeOrRollingRestartRack( + found *appsv1.StatefulSet, rackState *RackState, + ignorablePodNames sets.Set[string], failedPods []*corev1.Pod, +) (*appsv1.StatefulSet, reconcileResult) { var res reconcileResult // Always update configMap. We won't be able to find if a rack's config, and it's pod config is in sync or not // Checking rack.spec, rack.status will not work. @@ -413,7 +425,9 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } if rollingRestartInfo.needRestart { - found, res = r.rollingRestartRack(found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods) + found, res = r.rollingRestartRack( + found, rackState, ignorablePodNames, rollingRestartInfo.restartTypeMap, failedPods, + ) if !res.isSuccess { if res.err != nil { r.Log.Error( @@ -434,8 +448,10 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } if len(failedPods) == 0 && rollingRestartInfo.needUpdateConf { - res = r.updateDynamicConfig(rackState, ignorablePodNames, - rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod) + res = r.updateDynamicConfig( + rackState, ignorablePodNames, + rollingRestartInfo.restartTypeMap, rollingRestartInfo.dynamicConfDiffPerPod, + ) if !res.isSuccess { if res.err != nil { r.Log.Error( @@ -473,9 +489,11 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat return found, reconcileSuccess() } -func (r *SingleClusterReconciler) updateDynamicConfig(rackState *RackState, +func (r *SingleClusterReconciler) updateDynamicConfig( + rackState *RackState, ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType, - dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap) reconcileResult { + dynamicConfDiffPerPod map[string]asconfig.DynamicConfigMap, +) reconcileResult { r.Log.Info("Update dynamic config in Aerospike pods") r.Recorder.Eventf( @@ -601,8 +619,10 @@ func (r *SingleClusterReconciler) reconcileRack( // before the scale down could complete. if (r.aeroCluster.Status.Size > r.aeroCluster.Spec.Size) || (!r.IsStatusEmpty() && len(r.aeroCluster.Status.RackConfig.Racks) != len(r.aeroCluster.Spec.RackConfig.Racks)) { - if res = r.setMigrateFillDelay(r.getClientPolicy(), &rackState.Rack.AerospikeConfig, false, - nil); !res.isSuccess { + if res = r.setMigrateFillDelay( + r.getClientPolicy(), &rackState.Rack.AerospikeConfig, false, + nil, + ); !res.isSuccess { r.Log.Error(res.err, "Failed to revert migrate-fill-delay after scale down") return res } @@ -749,8 +769,10 @@ func (r *SingleClusterReconciler) scaleUpRack( return found, reconcileSuccess() } -func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, rackState *RackState, - ignorablePodNames sets.Set[string], failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { +func (r *SingleClusterReconciler) upgradeRack( + statefulSet *appsv1.StatefulSet, rackState *RackState, + ignorablePodNames sets.Set[string], failedPods []*corev1.Pod, +) (*appsv1.StatefulSet, reconcileResult) { var ( err error podList []*corev1.Pod @@ -815,7 +837,9 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r podsBatchList[0] = podsToUpgrade } else { // Create batch of pods - podsBatchList = getPodsBatchList(r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToUpgrade, len(podList)) + podsBatchList = getPodsBatchList( + r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToUpgrade, len(podList), + ) } if len(podsBatchList) > 0 { @@ -907,7 +931,8 @@ func (r *SingleClusterReconciler) scaleDownRack( diffPods := *found.Spec.Replicas - desiredSize podsBatchList := getPodsBatchList( - r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, oldPodList[:diffPods], len(oldPodList)) + r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize, oldPodList[:diffPods], len(oldPodList), + ) // Handle one batch podsBatch := podsBatchList[0] @@ -1051,9 +1076,11 @@ func (r *SingleClusterReconciler) scaleDownRack( return found, reconcileRequeueAfter(1) } -func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, rackState *RackState, +func (r *SingleClusterReconciler) rollingRestartRack( + found *appsv1.StatefulSet, rackState *RackState, ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType, - failedPods []*corev1.Pod) (*appsv1.StatefulSet, reconcileResult) { + failedPods []*corev1.Pod, +) (*appsv1.StatefulSet, reconcileResult) { r.Log.Info("Rolling restart AerospikeCluster statefulset pods") r.Recorder.Eventf( @@ -1132,7 +1159,8 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, } else { // Create batch of pods podsBatchList = getPodsBatchList( - r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList)) + r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList), + ) } // Restart batch of pods @@ -1178,7 +1206,8 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, return found, reconcileSuccess() } -func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1.StatefulSet, rackState *RackState, +func (r *SingleClusterReconciler) handleK8sNodeBlockListPods( + statefulSet *appsv1.StatefulSet, rackState *RackState, ignorablePodNames sets.Set[string], failedPods []*corev1.Pod, ) (*appsv1.StatefulSet, reconcileResult) { if err := r.updateSTS(statefulSet, rackState); err != nil { @@ -1212,8 +1241,10 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 pod := podList[idx] if blockedK8sNodes.Has(pod.Spec.NodeName) { - r.Log.Info("Pod found in blocked nodes list, migrating to a different node", - "podName", pod.Name) + r.Log.Info( + "Pod found in blocked nodes list, migrating to a different node", + "podName", pod.Name, + ) podsToRestart = append(podsToRestart, pod) @@ -1222,7 +1253,8 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 } podsBatchList := getPodsBatchList( - r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList)) + r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, podsToRestart, len(podList), + ) // Restart batch of pods if len(podsBatchList) > 0 { @@ -1824,26 +1856,8 @@ func isContainerNameInStorageVolumeAttachments( return false } -func splitRacks(nodes, racks int) []int { - nodesPerRack, extraNodes := nodes/racks, nodes%racks - - // Distributing nodes in given racks - var topology []int - - for rackIdx := 0; rackIdx < racks; rackIdx++ { - nodesForThisRack := nodesPerRack - if rackIdx < extraNodes { - nodesForThisRack++ - } - - topology = append(topology, nodesForThisRack) - } - - return topology -} - func getConfiguredRackStateList(aeroCluster *asdbv1.AerospikeCluster) []RackState { - topology := splitRacks( + topology := asdbv1.DistributeItems( int(aeroCluster.Spec.Size), len(aeroCluster.Spec.RackConfig.Racks), ) diff --git a/controllers/reconciler.go b/controllers/reconciler.go index 3f99f3efc..0c8ffc7db 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -82,6 +82,13 @@ func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error) return reconcile.Result{}, nil } + // Pause the reconciliation for the AerospikeCluster if the paused field is set to true. + // Deletion of the AerospikeCluster will not be paused. + if asdbv1.GetBool(r.aeroCluster.Spec.Paused) { + r.Log.Info("Reconciliation is paused for this AerospikeCluster") + return reconcile.Result{}, nil + } + // Set the status to AerospikeClusterInProgress before starting any operations if err := r.setStatusPhase(asdbv1.AerospikeClusterInProgress); err != nil { return reconcile.Result{}, err @@ -302,8 +309,10 @@ func (r *SingleClusterReconciler) recoverIgnorablePods() reconcileResult { return reconcileSuccess() } -func (r *SingleClusterReconciler) validateAndReconcileAccessControl(selectedPods []corev1.Pod, - ignorablePodNames sets.Set[string]) error { +func (r *SingleClusterReconciler) validateAndReconcileAccessControl( + selectedPods []corev1.Pod, + ignorablePodNames sets.Set[string], +) error { version, err := asdbv1.GetImageVersion(r.aeroCluster.Spec.Image) if err != nil { return err @@ -511,8 +520,6 @@ func (r *SingleClusterReconciler) updateAccessControlStatus() error { return fmt.Errorf("error updating status: %w", err) } - r.aeroCluster.Status.AerospikeClusterStatusSpec.AerospikeAccessControl = statusAerospikeAccessControl - r.Log.Info("Updated access control status", "status", newAeroCluster.Status) return nil @@ -965,7 +972,9 @@ func (r *SingleClusterReconciler) migrateInitialisedVolumeNames(ctx context.Cont } // Appending volume name as @ in initializedVolumes list - initializedVolumes = append(initializedVolumes, fmt.Sprintf("%s@%s", oldFormatInitVolNames[oldVolIdx], pvcUID)) + initializedVolumes = append( + initializedVolumes, fmt.Sprintf("%s@%s", oldFormatInitVolNames[oldVolIdx], pvcUID), + ) } } diff --git a/helm-charts/aerospike-cluster/README.md b/helm-charts/aerospike-cluster/README.md index 52d051aa7..7c640dd24 100644 --- a/helm-charts/aerospike-cluster/README.md +++ b/helm-charts/aerospike-cluster/README.md @@ -44,24 +44,30 @@ helm install aerospike ./aerospike-cluster/ \ ## Configurations -| Name | Description | Default | -| ---------- | ----------- | --------- | -| `replicas` | Aerospike cluster size | `3` | -| `image.repository` | Aerospike server container image repository | `aerospike/aerospike-server-enterprise` | -| `image.tag` | Aerospike server container image tag | `7.1.0.0` | -| `imagePullSecrets` | Secrets containing credentials to pull Aerospike container image from a private registry | `{}` (nil) | -| `customLabels` | Custom labels to add on the aerospikecluster resource | `{}` (nil) | -| `aerospikeAccessControl` | Aerospike access control configuration. Define users and roles to be created on the cluster. | `{}` (nil) | -| `aerospikeConfig` | Aerospike configuration | `{}` (nil) | -| `aerospikeNetworkPolicy` | Network policy (client access configuration) | `{}` (nil) | -| `commonName` | Base string for naming pods, services, stateful sets, etc. | Release name truncated to 63 characters (without hyphens) | -| `podSpec` | Aerospike pod spec configuration | `{}` (nil) | -| `rackConfig` | Aerospike rack configuration | `{}` (nil) | -| `storage` | Aerospike pod storage configuration | `{}` (nil) | -| `validationPolicy` | Validation policy | `{}` (nil) | -| `operatorClientCert` | Client certificates to connect to Aerospike | `{}` (nil) | -| `seedsFinderServices` | Service (e.g. loadbalancer) for Aerospike cluster discovery | `{}` (nil) | -| `devMode` | Deploy Aerospike cluster in dev mode | `false` | +| Name | Description | Default | +| ---------- |---------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------| +| `replicas` | Aerospike cluster size | `3` | +| `image.repository` | Aerospike server container image repository | `aerospike/aerospike-server-enterprise` | +| `image.tag` | Aerospike server container image tag | `7.1.0.0` | +| `imagePullSecrets` | Secrets containing credentials to pull Aerospike container image from a private registry | `{}` (nil) | +| `customLabels` | Custom labels to add on the aerospikecluster resource | `{}` (nil) | +| `aerospikeAccessControl` | Aerospike access control configuration. Define users and roles to be created on the cluster. | `{}` (nil) | +| `aerospikeConfig` | Aerospike configuration | `{}` (nil) | +| `aerospikeNetworkPolicy` | Network policy (client access configuration) | `{}` (nil) | +| `commonName` | Base string for naming pods, services, stateful sets, etc. | Release name truncated to 63 characters (without hyphens) | +| `podSpec` | Aerospike pod spec configuration | `{}` (nil) | +| `rackConfig` | Aerospike rack configuration | `{}` (nil) | +| `storage` | Aerospike pod storage configuration | `{}` (nil) | +| `validationPolicy` | Validation policy | `{}` (nil) | +| `operatorClientCert` | Client certificates to connect to Aerospike | `{}` (nil) | +| `seedsFinderServices` | Service (e.g. loadbalancer) for Aerospike cluster discovery | `{}` (nil) | +| `maxUnavailable` | maxUnavailable defines percentage/number of pods that can be allowed to go down or unavailable before application disruption | `1` | +| `disablePDB` | Disable the PodDisruptionBudget creation for the Aerospike cluster | `false` | +| `enableDynamicConfigUpdate` | enableDynamicConfigUpdate enables dynamic config update flow of the operator | `false` | +| `rosterNodeBlockList` | rosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup | `[]` | +| `k8sNodeBlockList` | k8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods | `[]` | +| `paused` | Pause reconciliation of the cluster | `false` | +| `devMode` | Deploy Aerospike cluster in dev mode | `false` | ### Default values in "dev" mode (`devMode=true`): diff --git a/helm-charts/aerospike-cluster/templates/aerospike-cluster-cr.yaml b/helm-charts/aerospike-cluster/templates/aerospike-cluster-cr.yaml index 5ba0dc431..738ec9300 100644 --- a/helm-charts/aerospike-cluster/templates/aerospike-cluster-cr.yaml +++ b/helm-charts/aerospike-cluster/templates/aerospike-cluster-cr.yaml @@ -13,11 +13,18 @@ metadata: spec: # Aerospike cluster size - size: {{ .Values.replicas | default 3 }} + size: {{ .Values.replicas }} # Aerospike server docker image image: {{ .Values.image.repository | default "aerospike/aerospike-server-enterprise" }}:{{ .Values.image.tag | default "7.1.0.0" }} + ## maxUnavailable defines percentage/number of pods that can be allowed to go down or unavailable + ## before application disruption. + maxUnavailable: {{ .Values.maxUnavailable }} + + ## Disable the PodDisruptionBudget creation for the Aerospike cluster. + disablePDB: {{ .Values.disablePDB }} + # Aerospike access control configuration {{- with .Values.aerospikeAccessControl }} aerospikeAccessControl: {{- toYaml . | nindent 4 }} @@ -50,6 +57,9 @@ spec: {{- end }} + ## enableDynamicConfigUpdate enables dynamic config update flow of the operator. + enableDynamicConfigUpdate: {{ .Values.enableDynamicConfigUpdate }} + # Aerospike network policy {{- with .Values.aerospikeNetworkPolicy }} aerospikeNetworkPolicy: {{- toYaml . | nindent 4 }} @@ -106,3 +116,16 @@ spec: {{- with .Values.seedsFinderServices }} seedsFinderServices: {{- toYaml . | nindent 4 }} {{- end }} + + ## rosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup + {{- with .Values.rosterNodeBlockList }} + rosterNodeBlockList: {{- toYaml . | nindent 4 }} + {{- end }} + + ## k8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. + {{- with .Values.k8sNodeBlockList }} + k8sNodeBlockList: {{- toYaml . | nindent 4 }} + {{- end }} + + ## Pause reconciliation of the cluster + paused: {{ .Values.paused }} diff --git a/helm-charts/aerospike-cluster/values.yaml b/helm-charts/aerospike-cluster/values.yaml index cb9774064..e661a40d6 100644 --- a/helm-charts/aerospike-cluster/values.yaml +++ b/helm-charts/aerospike-cluster/values.yaml @@ -11,7 +11,7 @@ image: tag: 7.1.0.0 ## In case the above image is pulled from a registry that requires -## authentication, a secret containining credentials can be added +## authentication, a secret containing credentials can be added ## imagePullSecrets: ## - secret_with_credentials_to_custom_registry imagePullSecrets: {} @@ -21,89 +21,75 @@ customLabels: {} ## Aerospike access control configuration aerospikeAccessControl: {} - # users: - # - name: admin - # secretName: auth-secret - # roles: - # - sys-admin - # - user-admin - # adminPolicy: - # # timeout in milliseconds - # timeout: 1000 - # roles: - # - name: - # privileges: [] - # whitelist: [] +# users: +# - name: admin +# secretName: auth-secret +# roles: +# - sys-admin +# - user-admin +# adminPolicy: +# # timeout in milliseconds +# timeout: 1000 +# roles: +# - name: +# privileges: [] +# whitelist: [] ## Aerospike Configuration aerospikeConfig: - # service: - # feature-key-file: /etc/aerospike/secrets/features.conf - - # security: - # enable-security: false - - # network: - # service: - # port: 3000 - # fabric: - # port: 3001 - # heartbeat: - # port: 3002 - - # namespaces: - # - name: test - # replication-factor: 2 - # storage-engine: - # type: memory - # data-size: 1073741824 # 1GiB - - -## Aerospike secrets -## To add feature key file, tls certificates etc. -## We may be able to add feature key file, certificates and other secrets dynamically during helm install -## when, -## 1. operator supports adding multiple secret sources, or -## 2. https://github.com/helm/helm/pull/8841 feature is added. - -# aerospikeSecretName: aerospike-secrets -# aerospikeSecretMountPath: /etc/aerospike/secrets/ +# service: +# feature-key-file: /etc/aerospike/secrets/features.conf +# +# network: +# service: +# port: 3000 +# fabric: +# port: 3001 +# heartbeat: +# port: 3002 +# +# namespaces: +# - name: test +# replication-factor: 2 +# storage-engine: +# type: memory +# data-size: 1073741824 # 1GiB ## Network policy aerospikeNetworkPolicy: {} - # access: pod - # alternateAccess: hostExternal - # tlsAccess: pod - # tlsAlternateAccess: hostExternal +# access: pod +# alternateAccess: hostExternal +# tlsAccess: pod +# tlsAlternateAccess: hostExternal ## Pod spec podSpec: {} - # Multi pod per host - # multiPodPerHost: true - # sidecars: - # - name: aerospike-prometheus-exporter - # image: "aerospike/aerospike-prometheus-exporter:1.1.6" - # ports: - # - containerPort: 9145 - # name: exporter +## Multi pod per host +# multiPodPerHost: true +# sidecars: +# - name: aerospike-prometheus-exporter +# image: aerospike/aerospike-prometheus-exporter:v1.18.0 +# ports: +# - containerPort: 9145 +# name: exporter ## Rack configuration rackConfig: {} ## Storage configuration storage: {} - # volumes: - # - name: aerospike-config-secret - # source: - # secret: - # secretName: aerospike-secret - # aerospike: - # path: /etc/aerospike/secrets +# volumes: +# - name: aerospike-config-secret +# source: +# secret: +# secretName: aerospike-secret +# aerospike: +# path: /etc/aerospike/secrets ## Validation policy validationPolicy: {} - # skipWorkDirValidate: true - # skipXdrDlogFileValidate: true +# skipWorkDirValidate: true +# skipXdrDlogFileValidate: true ## seedsFinderServices defines service (e.g. loadbalancer) to connect to Aerospike seedsFinderServices: {} @@ -111,5 +97,28 @@ seedsFinderServices: {} ## operatorClientCert defines certificates to connect to Aerospike operatorClientCert: {} +## maxUnavailable defines percentage/number of pods that can be allowed to go down or unavailable +## before application disruption. +maxUnavailable: 1 + +## Disable the PodDisruptionBudget creation for the Aerospike cluster. +disablePDB: false + +## enableDynamicConfigUpdate enables dynamic config update flow of the operator. +enableDynamicConfigUpdate: false + +## rosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup +## Replace the value with aerospike node id which needs to be blocked. +rosterNodeBlockList: [] +# - + +## k8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. +## Replace the value with kubernetes cluster node name which needs to be blocked. +k8sNodeBlockList: [] +# - + +## Pause reconciliation of the cluster +paused: false + ## Dev Mode devMode: false diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index 2f9447451..b46b57772 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -312,6 +312,32 @@ spec: This value is used to create PodDisruptionBudget. Defaults to 1. Refer Aerospike documentation for more details. x-kubernetes-int-or-string: true + operations: + description: Operations is a list of on-demand operations to be performed + on the Aerospike cluster. + items: + properties: + id: + maxLength: 20 + minLength: 1 + type: string + kind: + description: Kind is the type of operation to be performed on + the Aerospike cluster. + enum: + - WarmRestart + - PodRestart + type: string + podList: + items: + type: string + type: array + required: + - id + - kind + type: object + maxItems: 1 + type: array operatorClientCert: description: Certificates to connect to Aerospike. properties: @@ -356,6 +382,10 @@ spec: list by the operator type: string type: object + paused: + description: Paused flag is used to pause the reconciliation for the + AerospikeCluster. + type: boolean podSpec: description: Specify additional configuration for the Aerospike pods properties: @@ -9651,6 +9681,31 @@ spec: is the port requested by the user. Deprecated: MultiPodPerHost is now part of podSpec" type: boolean + operations: + description: Operations is a list of on-demand operation to be performed + on the Aerospike cluster. + items: + properties: + id: + maxLength: 20 + minLength: 1 + type: string + kind: + description: Kind is the type of operation to be performed on + the Aerospike cluster. + enum: + - WarmRestart + - PodRestart + type: string + podList: + items: + type: string + type: array + required: + - id + - kind + type: object + type: array operatorClientCertSpec: description: Certificates to connect to Aerospike. If omitted then certs are taken from the secret 'aerospike-secret'. diff --git a/helm-charts/aerospike-kubernetes-operator/values.yaml b/helm-charts/aerospike-kubernetes-operator/values.yaml index 3befb4221..faea89650 100644 --- a/helm-charts/aerospike-kubernetes-operator/values.yaml +++ b/helm-charts/aerospike-kubernetes-operator/values.yaml @@ -8,7 +8,7 @@ operatorImage: pullPolicy: IfNotPresent ## In case the above image is pulled from a registry that requires -## authentication, a secret containining credentials can be added +## authentication, a secret containing credentials can be added ## imagePullSecrets: ## - secret_with_credentials_to_custom_registry imagePullSecrets: {} diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index 8a4d6abfb..5addec1fc 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -119,17 +119,6 @@ func IsPodTerminating(pod *corev1.Pod) bool { return pod.DeletionTimestamp != nil } -// GetPod get pod from pod list by name -func GetPod(podName string, pods []corev1.Pod) *corev1.Pod { - for idx := range pods { - if podName == pods[idx].Name { - return &pods[idx] - } - } - - return nil -} - // GetRackIDFromPodName returns the rack id given a pod name. func GetRackIDFromPodName(podName string) (*int, error) { parts := strings.Split(podName, "-") diff --git a/test/access_control_test.go b/test/access_control_test.go index 78e2cf495..449c838f1 100644 --- a/test/access_control_test.go +++ b/test/access_control_test.go @@ -2191,6 +2191,8 @@ var _ = Describe( racks := getDummyRackConf(1, 2) aeroCluster.Spec.RackConfig.Racks = racks aeroCluster.Spec.RackConfig.Namespaces = []string{"test"} + // Setting incorrect secret name so that access control reconciler could not set the password for admin. + aeroCluster.Spec.AerospikeAccessControl.Users[0].SecretName = "incorrectSecretName" // This file is already added in the storage volume backed by the secret. aeroCluster.Spec.AerospikeConfig.Value["security"] = map[string]interface{}{ "default-password-file": "/etc/aerospike/secret/password.conf", @@ -2214,6 +2216,7 @@ var _ = Describe( Eventually(func() error { clientPolicy := getClientPolicy(aeroCluster, k8sClient) clientPolicy.Password = pass + clientPolicy.FailIfNotConnected = true client, cerr := getClientWithPolicy( pkgLog, aeroCluster, k8sClient, clientPolicy) @@ -2231,11 +2234,27 @@ var _ = Describe( return nil }, 5*time.Minute).ShouldNot(HaveOccurred()) + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + // Set correct secret name for admin user credentials. + aeroCluster.Spec.AerospikeAccessControl.Users[0].SecretName = authSecretName + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + By("Try scaleup") err = scaleUpClusterTest( k8sClient, ctx, clusterNamespacedName, 1, ) Expect(err).ToNot(HaveOccurred()) + + if aeroCluster != nil { + err = deleteCluster( + k8sClient, ctx, aeroCluster, + ) + Expect(err).ToNot(HaveOccurred()) + } }) }) }, diff --git a/test/batch_restart_pods_test.go b/test/batch_restart_pods_test.go index 5a9342c79..6e05d9cab 100644 --- a/test/batch_restart_pods_test.go +++ b/test/batch_restart_pods_test.go @@ -30,7 +30,7 @@ func percent(val string) *intstr.IntOrString { } func count(val int) *intstr.IntOrString { - v := intstr.FromInt(val) + v := intstr.FromInt32(int32(val)) return &v } @@ -528,9 +528,7 @@ func batchRollingRestartTest( aeroCluster.Spec.RackConfig.RollingUpdateBatchSize = batchSize aeroCluster.Spec.PodSpec.AerospikeContainerSpec.Resources = unschedulableResource() - err = updateClusterForBatchRestart(k8sClient, ctx, aeroCluster) - - return err + return updateClusterForBatchRestart(k8sClient, ctx, aeroCluster) } func batchUpgradeTest( @@ -546,9 +544,7 @@ func batchUpgradeTest( aeroCluster.Spec.RackConfig.RollingUpdateBatchSize = batchSize aeroCluster.Spec.Image = unavailableImage - err = updateClusterForBatchRestart(k8sClient, ctx, aeroCluster) - - return err + return updateClusterForBatchRestart(k8sClient, ctx, aeroCluster) } func rollingRestartTest( @@ -569,9 +565,7 @@ func rollingRestartTest( aeroCluster.Spec.PodSpec.AerospikeContainerSpec.Resources = unschedulableResource() } - err = updateCluster(k8sClient, ctx, aeroCluster) - - return err + return updateCluster(k8sClient, ctx, aeroCluster) } func upgradeTest( @@ -587,7 +581,5 @@ func upgradeTest( aeroCluster.Spec.RackConfig.RollingUpdateBatchSize = batchSize aeroCluster.Spec.Image = image - err = updateCluster(k8sClient, ctx, aeroCluster) - - return err + return updateCluster(k8sClient, ctx, aeroCluster) } diff --git a/test/cluster_test.go b/test/cluster_test.go index 85a3d8146..fb88bd186 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -79,9 +79,114 @@ var _ = Describe( UpdateClusterPre600(ctx) }, ) + Context( + "PauseReconcile", func() { + PauseReconcileTest(ctx) + }, + ) }, ) +func PauseReconcileTest(ctx goctx.Context) { + clusterNamespacedName := getNamespacedName( + "pause-reconcile", namespace, + ) + + BeforeEach( + func() { + aeroCluster := createDummyAerospikeCluster(clusterNamespacedName, 2) + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + _ = deleteCluster(k8sClient, ctx, aeroCluster) + }, + ) + + It( + "Should pause reconcile", func() { + // Testing over upgrade as it is a long-running operation + By("1. Start upgrade and pause at partial upgrade") + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + err = UpdateClusterImage(aeroCluster, nextImage) + Expect(err).ToNot(HaveOccurred()) + + err = k8sClient.Update(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + Eventually( + func() bool { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + // Check if at least one pod is upgraded + podUpgraded := false + for podName := range aeroCluster.Status.Pods { + podStatus := aeroCluster.Status.Pods[podName] + if podStatus.Image == nextImage { + pkgLog.Info("One Pod upgraded", "pod", podName, "image", podStatus.Image) + podUpgraded = true + break + } + } + + return podUpgraded + }, 2*time.Minute, 1*time.Second, + ).Should(BeTrue()) + + By("Pause reconcile") + err = setPauseFlag(ctx, clusterNamespacedName, ptr.To(true)) + Expect(err).ToNot(HaveOccurred()) + + By("2. Upgrade should fail") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + err = waitForAerospikeCluster( + k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval, + getTimeout(1), []asdbv1.AerospikeClusterPhase{asdbv1.AerospikeClusterCompleted}, + ) + Expect(err).To(HaveOccurred()) + + // Resume reconcile and Wait for all pods to be upgraded + By("3. Resume reconcile and upgrade should succeed") + err = setPauseFlag(ctx, clusterNamespacedName, nil) + Expect(err).ToNot(HaveOccurred()) + + By("Upgrade should succeed") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + err = waitForAerospikeCluster( + k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval, + getTimeout(2), []asdbv1.AerospikeClusterPhase{asdbv1.AerospikeClusterCompleted}, + ) + Expect(err).ToNot(HaveOccurred()) + }, + ) +} + +func setPauseFlag(ctx goctx.Context, clusterNamespacedName types.NamespacedName, pause *bool) error { + aeroCluster, err := getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + + aeroCluster.Spec.Paused = pause + + return k8sClient.Update(ctx, aeroCluster) +} + func UpdateClusterPre600(ctx goctx.Context) { Context( "UpdateClusterPre600", func() { @@ -199,7 +304,8 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { nodeList = &v1.NodeList{} podList = &v1.PodList{} expectedPhases = []asdbv1.AerospikeClusterPhase{ - asdbv1.AerospikeClusterInProgress, asdbv1.AerospikeClusterCompleted} + asdbv1.AerospikeClusterInProgress, asdbv1.AerospikeClusterCompleted, + } ) clusterNamespacedName := getNamespacedName( @@ -238,27 +344,31 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { // As pod is in pending state, CR object will be updated continuously // This is put in eventually to retry Object Conflict error - Eventually(func() error { - aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) - Expect(err).ToNot(HaveOccurred()) - val := intstr.FromInt32(1) - aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val - aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["enable-quotas"] = true - - // As pod is in pending state, CR object will be won't reach the final phase. - // So expectedPhases can be InProgress or Completed - return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) - }, 1*time.Minute).ShouldNot(HaveOccurred()) + Eventually( + func() error { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + val := intstr.FromInt32(1) + aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val + aeroCluster.Spec.AerospikeConfig.Value["security"].(map[string]interface{})["enable-quotas"] = true + + // As pod is in pending state, CR object won't reach the final phase. + // So expectedPhases can be InProgress or Completed + return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) + }, 1*time.Minute, + ).ShouldNot(HaveOccurred()) By("Upgrade version") - Eventually(func() error { - aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) - Expect(err).ToNot(HaveOccurred()) - aeroCluster.Spec.Image = nextImage - // As pod is in pending state, CR object will be won't reach the final phase. - // So expectedPhases can be InProgress or Completed - return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) - }, 1*time.Minute).ShouldNot(HaveOccurred()) + Eventually( + func() error { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.Image = nextImage + // As pod is in pending state, CR object won't reach the final phase. + // So expectedPhases can be InProgress or Completed + return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) + }, 1*time.Minute, + ).ShouldNot(HaveOccurred()) By("Verify pending pod") podList, err = getPodList(aeroCluster, k8sClient) @@ -273,11 +383,43 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { // There should be only one pending pod Expect(counter).To(Equal(1)) + By("Executing on-demand operation") + Eventually( + func() error { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + } + aeroCluster.Spec.Operations = operations + // As pod is in pending state, CR object won't reach the final phase. + // So expectedPhases can be InProgress or Completed + return updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) + }, 1*time.Minute, + ).ShouldNot(HaveOccurred()) + + By("Verify pending pod") + podList, err = getPodList(aeroCluster, k8sClient) + + counter = 0 + + for idx := range podList.Items { + if podList.Items[idx].Status.Phase == v1.PodPending { + counter++ + } + } + // There should be only one pending pod + Expect(counter).To(Equal(1)) + By("Scale down 1 pod") aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) Expect(err).ToNot(HaveOccurred()) aeroCluster.Spec.Size-- - // As pod is in pending state, CR object will be won't reach the final phase. + // As pod is in pending state, CR object won't reach the final phase. // So expectedPhases can be InProgress or Completed err = updateClusterWithExpectedPhases(k8sClient, ctx, aeroCluster, expectedPhases) Expect(err).ToNot(HaveOccurred()) @@ -312,14 +454,19 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { ignorePodName := clusterNamespacedName.Name + "-1-1" pod := &v1.Pod{} - err := k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName, - Namespace: clusterNamespacedName.Namespace}, pod) + err := k8sClient.Get( + ctx, types.NamespacedName{ + Name: ignorePodName, + Namespace: clusterNamespacedName.Namespace, + }, pod, + ) Expect(err).ToNot(HaveOccurred()) pod.Spec.Containers[0].Image = wrongImage err = k8sClient.Update(ctx, pod) Expect(err).ToNot(HaveOccurred()) + // Underlying kubernetes cluster should have atleast 6 nodes to run this test successfully. By("Delete rack with id 2") aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) Expect(err).ToNot(HaveOccurred()) @@ -330,19 +477,29 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { Expect(err).ToNot(HaveOccurred()) By(fmt.Sprintf("Verify if failed pod %s is automatically recovered", ignorePodName)) - Eventually(func() bool { - err = k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName, - Namespace: clusterNamespacedName.Namespace}, pod) - - return len(pod.Status.ContainerStatuses) != 0 && *pod.Status.ContainerStatuses[0].Started && - pod.Status.ContainerStatuses[0].Ready - }, 1*time.Minute).Should(BeTrue()) - - Eventually(func() error { - return InterceptGomegaFailure(func() { - validateRoster(k8sClient, ctx, clusterNamespacedName, scNamespace) - }) - }, 4*time.Minute).Should(BeNil()) + Eventually( + func() bool { + err = k8sClient.Get( + ctx, types.NamespacedName{ + Name: ignorePodName, + Namespace: clusterNamespacedName.Namespace, + }, pod, + ) + + return len(pod.Status.ContainerStatuses) != 0 && *pod.Status.ContainerStatuses[0].Started && + pod.Status.ContainerStatuses[0].Ready + }, 1*time.Minute, + ).Should(BeTrue()) + + Eventually( + func() error { + return InterceptGomegaFailure( + func() { + validateRoster(k8sClient, ctx, clusterNamespacedName, scNamespace) + }, + ) + }, 4*time.Minute, + ).Should(BeNil()) }, ) @@ -352,8 +509,12 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { ignorePodName := clusterNamespacedName.Name + "-1-1" pod := &v1.Pod{} - err := k8sClient.Get(ctx, types.NamespacedName{Name: ignorePodName, - Namespace: clusterNamespacedName.Namespace}, pod) + err := k8sClient.Get( + ctx, types.NamespacedName{ + Name: ignorePodName, + Namespace: clusterNamespacedName.Namespace, + }, pod, + ) Expect(err).ToNot(HaveOccurred()) pod.Spec.Containers[0].Image = wrongImage @@ -399,7 +560,8 @@ func deployClusterForMaxIgnorablePods(ctx goctx.Context, clusterNamespacedName t nsList = append(nsList, getNonSCNamespaceConfig("bar", "/test/dev/xvdf1")) aeroCluster.Spec.AerospikeConfig.Value["namespaces"] = nsList - aeroCluster.Spec.Storage.Volumes = append(aeroCluster.Spec.Storage.Volumes, + aeroCluster.Spec.Storage.Volumes = append( + aeroCluster.Spec.Storage.Volumes, asdbv1.VolumeSpec{ Name: "bar", Source: asdbv1.VolumeSource{ @@ -416,7 +578,8 @@ func deployClusterForMaxIgnorablePods(ctx goctx.Context, clusterNamespacedName t ) racks := getDummyRackConf(1, 2) aeroCluster.Spec.RackConfig = asdbv1.RackConfig{ - Namespaces: []string{scNamespace}, Racks: racks} + Namespaces: []string{scNamespace}, Racks: racks, + } aeroCluster.Spec.PodSpec.MultiPodPerHost = ptr.To(false) err := deployCluster(k8sClient, ctx, aeroCluster) Expect(err).ToNot(HaveOccurred()) @@ -671,12 +834,14 @@ func UpdateTLSClusterTest(ctx goctx.Context) { network := aeroCluster.Spec.AerospikeConfig.Value["network"].(map[string]interface{}) tlsList := network["tls"].([]interface{}) - tlsList = append(tlsList, map[string]interface{}{ - "name": "aerospike-a-0.test-runner1", - "cert-file": "/etc/aerospike/secret/svc_cluster_chain.pem", - "key-file": "/etc/aerospike/secret/svc_key.pem", - "ca-file": "/etc/aerospike/secret/cacert.pem", - }) + tlsList = append( + tlsList, map[string]interface{}{ + "name": "aerospike-a-0.test-runner1", + "cert-file": "/etc/aerospike/secret/svc_cluster_chain.pem", + "key-file": "/etc/aerospike/secret/svc_key.pem", + "ca-file": "/etc/aerospike/secret/cacert.pem", + }, + ) network["tls"] = tlsList aeroCluster.Spec.AerospikeConfig.Value["network"] = network err = updateCluster(k8sClient, ctx, aeroCluster) diff --git a/test/large_reconcile_test.go b/test/large_reconcile_test.go index 1a5b9a8a2..470b41a55 100644 --- a/test/large_reconcile_test.go +++ b/test/large_reconcile_test.go @@ -198,41 +198,16 @@ var _ = Describe( func loadDataInCluster( k8sClient client.Client, aeroCluster *asdbv1.AerospikeCluster, ) error { - policy := getClientPolicy(aeroCluster, k8sClient) - policy.FailIfNotConnected = false - policy.Timeout = time.Minute * 2 - policy.UseServicesAlternate = true - policy.ConnectionQueueSize = 100 - policy.LimitConnectionsToQueueSize = true - - hostList := make([]*as.Host, 0, len(aeroCluster.Status.Pods)) - - for podName := range aeroCluster.Status.Pods { - pod := aeroCluster.Status.Pods[podName] - - host, err := createHost(&pod) - if err != nil { - return err - } - - hostList = append(hostList, host) - } - - clientP, err := as.NewClientWithPolicyAndHost(policy, hostList...) - if clientP == nil { - return fmt.Errorf( - "failed to create aerospike cluster asClient: %v", err, - ) + asClient, err := getAerospikeClient(aeroCluster, k8sClient) + if err != nil { + return err } - asClient := *clientP defer func() { fmt.Println("Closing Aerospike client") asClient.Close() }() - _, _ = asClient.WarmUp(-1) - keyPrefix := "testkey" size := 100 @@ -244,11 +219,6 @@ func loadDataInCluster( return readErr } - for !asClient.IsConnected() { - pkgLog.Info("Waiting for cluster to connect") - time.Sleep(2 * time.Second) - } - pkgLog.Info( "Loading record", "nodes", asClient.GetNodeNames(), ) diff --git a/test/on_demand_operations_test.go b/test/on_demand_operations_test.go new file mode 100644 index 000000000..b3a5644f4 --- /dev/null +++ b/test/on_demand_operations_test.go @@ -0,0 +1,483 @@ +package test + +import ( + goctx "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/utils/ptr" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" +) + +var _ = Describe( + "OnDemandOperations", func() { + + ctx := goctx.Background() + var clusterNamespacedName = getNamespacedName( + "operations", namespace, + ) + + aeroCluster := &asdbv1.AerospikeCluster{} + + BeforeEach( + func() { + // Create a 2 node cluster + aeroCluster = createDummyRackAwareAerospikeCluster( + clusterNamespacedName, 2, + ) + + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + _ = deleteCluster(k8sClient, ctx, aeroCluster) + }, + ) + + Context( + "When doing valid operations", func() { + + It( + "Should execute quickRestart operations on all pods", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationWarmRestart, + "operations-1-1": asdbv1.OperationWarmRestart, + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should execute podRestart operation on all pods", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationPodRestart, + ID: "1", + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationPodRestart, + "operations-1-1": asdbv1.OperationPodRestart, + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should be able to replace/remove the running operations", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + } + + aeroCluster.Spec.Operations = operations + + err = k8sClient.Update(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() error { + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.Operations[0].Kind = asdbv1.OperationPodRestart + aeroCluster.Spec.Operations[0].ID = "2" + + return updateCluster(k8sClient, ctx, aeroCluster) + }, 1*time.Minute).ShouldNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationPodRestart, + "operations-1-1": asdbv1.OperationPodRestart, + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + + // Remove operations + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.Operations = nil + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should execute operations on selected pods with dynamic config change", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationPodRestart, + ID: "1", + PodList: []string{"operations-1-0"}, + }, + } + + aeroCluster.Spec.EnableDynamicConfigUpdate = ptr.To(true) + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = 18000 + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationPodRestart, + "operations-1-1": "noRestart", + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should execute on-demand podRestart operations on all pods along with scale down", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.Size = 4 + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationPodRestart, + ID: "1", + }, + } + + aeroCluster.Spec.Operations = operations + aeroCluster.Spec.Size = 2 + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationPodRestart, + "operations-1-1": asdbv1.OperationPodRestart, + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should execute podRestart if podSpec is changed with on-demand warm restart", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + oldPodIDs, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster.Spec.PodSpec.AerospikeContainerSpec.Resources = schedulableResource("200Mi") + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operationTypeMap := map[string]asdbv1.OperationKind{ + "operations-1-0": asdbv1.OperationPodRestart, + "operations-1-1": asdbv1.OperationPodRestart, + } + + err = validateOperationTypes(ctx, aeroCluster, oldPodIDs, operationTypeMap) + Expect(err).ToNot(HaveOccurred()) + }, + ) + }, + ) + + Context( + "When doing invalid operations", func() { + It( + "Should fail if there are more than 1 operations", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + }, + { + Kind: asdbv1.OperationPodRestart, + ID: "2", + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + + It( + "should fail if invalid pod name is mentioned in the pod list", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + PodList: []string{"operations-1-0", "invalid-pod"}, + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + + It( + "should fail if operationType is modified", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + PodList: []string{"operations-1-0"}, + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + // Modify operationType + operations[0].Kind = asdbv1.OperationPodRestart + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + + It( + "should fail if podList is modified", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + PodList: []string{"operations-1-0"}, + }, + } + + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + // Modify podList + operations[0].PodList = []string{"operations-1-1"} + aeroCluster.Spec.Operations = operations + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + + It( + "should fail any operation along with cluster scale-up", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + PodList: []string{"operations-1-0"}, + }, + } + + aeroCluster.Spec.Operations = operations + aeroCluster.Spec.Size++ + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + + It( + "should fail any operation along with cluster upgrade", func() { + aeroCluster, err := getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + operations := []asdbv1.OperationSpec{ + { + Kind: asdbv1.OperationWarmRestart, + ID: "1", + PodList: []string{"operations-1-0"}, + }, + } + + aeroCluster.Spec.Operations = operations + aeroCluster.Spec.Image = nextImage + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }, + ) + }, + ) + }, +) + +func validateOperationTypes(ctx goctx.Context, aeroCluster *asdbv1.AerospikeCluster, pid map[string]podID, + operationTypeMap map[string]asdbv1.OperationKind) error { + newPodPidMap, err := getPodIDs(ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + for podName, opType := range operationTypeMap { + switch opType { + case asdbv1.OperationWarmRestart: + if newPodPidMap[podName].podUID != pid[podName].podUID || newPodPidMap[podName].asdPID == pid[podName].asdPID { + return fmt.Errorf("failed to quick restart pod %s", podName) + } + case asdbv1.OperationPodRestart: + if newPodPidMap[podName].podUID == pid[podName].podUID { + return fmt.Errorf("failed to restart pod %s", podName) + } + case "noRestart": + if newPodPidMap[podName].podUID != pid[podName].podUID || newPodPidMap[podName].asdPID != pid[podName].asdPID { + return fmt.Errorf("unexpected restart pod %s", podName) + } + } + } + + return nil +} diff --git a/test/storage_wipe_test.go b/test/storage_wipe_test.go index c6133edfe..9067a904d 100644 --- a/test/storage_wipe_test.go +++ b/test/storage_wipe_test.go @@ -3,7 +3,6 @@ package test import ( goctx "context" "fmt" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -211,40 +210,12 @@ func writeDataToCluster( k8sClient client.Client, namespaces []string, ) error { - policy := getClientPolicy(aeroCluster, k8sClient) - policy.FailIfNotConnected = false - policy.Timeout = time.Minute * 2 - policy.UseServicesAlternate = true - policy.ConnectionQueueSize = 100 - policy.LimitConnectionsToQueueSize = true - hostList := make([]*as.Host, 0, len(aeroCluster.Status.Pods)) - - for podName := range aeroCluster.Status.Pods { - pod := aeroCluster.Status.Pods[podName] - - host, err := createHost(&pod) - if err != nil { - return err - } - - hostList = append(hostList, host) - } - - asClient, err := as.NewClientWithPolicyAndHost(policy, hostList...) - if asClient == nil { - return fmt.Errorf("aerospike client is nil %v", err) - } - - defer asClient.Close() - - if _, err = asClient.WarmUp(-1); err != nil { + asClient, err := getAerospikeClient(aeroCluster, k8sClient) + if err != nil { return err } - for !asClient.IsConnected() { - pkgLog.Info("Waiting for cluster to connect") - time.Sleep(2 * time.Second) - } + defer asClient.Close() pkgLog.Info( "Loading record", "nodes", asClient.GetNodeNames(), @@ -277,45 +248,17 @@ func checkDataInCluster( ) (map[string]bool, error) { data := make(map[string]bool) - policy := getClientPolicy(aeroCluster, k8sClient) - policy.FailIfNotConnected = false - policy.Timeout = time.Minute * 2 - policy.UseServicesAlternate = true - policy.ConnectionQueueSize = 100 - policy.LimitConnectionsToQueueSize = true - hostList := make([]*as.Host, 0, len(aeroCluster.Status.Pods)) - - for podName := range aeroCluster.Status.Pods { - pod := aeroCluster.Status.Pods[podName] - - host, err := createHost(&pod) - if err != nil { - return nil, err - } - - hostList = append(hostList, host) - } - - asClient, err := as.NewClientWithPolicyAndHost(policy, hostList...) + asClient, err := getAerospikeClient(aeroCluster, k8sClient) if err != nil { return nil, err } defer asClient.Close() - for !asClient.IsConnected() { - pkgLog.Info("Waiting for cluster to connect") - time.Sleep(2 * time.Second) - } - pkgLog.Info( "Loading record", "nodes", asClient.GetNodeNames(), ) - if _, err = asClient.WarmUp(-1); err != nil { - return nil, err - } - for _, ns := range namespaces { newKey, err := as.NewKey(ns, setName, key) if err != nil { @@ -327,8 +270,8 @@ func checkDataInCluster( return nil, nil } - if bin, ok := record.Bins[binName]; ok { - value := bin.(string) + if bin, exists := record.Bins[binName]; exists { + value, ok := bin.(string) if !ok { return nil, fmt.Errorf( diff --git a/test/suite_test.go b/test/suite_test.go index 1cd633125..b3c3ff2d3 100644 --- a/test/suite_test.go +++ b/test/suite_test.go @@ -1,5 +1,5 @@ /* -Copyright 2021. +Copyright 2024. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/test_client.go b/test/test_client.go index 7ea76bdbd..295b07159 100644 --- a/test/test_client.go +++ b/test/test_client.go @@ -215,9 +215,8 @@ func getClientPolicy( RootCAs: getClusterServerPool( clientCertSpec, aeroCluster.Namespace, k8sClient, ), - Certificates: []tls.Certificate{}, - PreferServerCipherSuites: true, - MinVersion: tls.VersionTLS12, + Certificates: []tls.Certificate{}, + MinVersion: tls.VersionTLS12, // used only in testing // InsecureSkipVerify: true, } diff --git a/test/utils.go b/test/utils.go index e3c5bf5c5..302cbc049 100644 --- a/test/utils.go +++ b/test/utils.go @@ -26,6 +26,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + as "github.com/aerospike/aerospike-client-go/v7" asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" operatorUtils "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" lib "github.com/aerospike/aerospike-management-lib" @@ -212,7 +213,7 @@ func createAuthSecret( labels map[string]string, secretName, pass string, ) error { // Create authSecret - as := &corev1.Secret{ + secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: secretName, Namespace: namespace, @@ -224,7 +225,7 @@ func createAuthSecret( }, } // use test context's create helper to create the object and add a cleanup function for the new object - err := k8sClient.Create(ctx, as) + err := k8sClient.Create(ctx, secret) if !errors.IsAlreadyExists(err) { return err } @@ -287,16 +288,20 @@ func isClusterStateValid( return false } - // Validate status - statusToSpec, err := asdbv1.CopyStatusToSpec(&newCluster.Status.AerospikeClusterStatusSpec) - if err != nil { - pkgLog.Error(err, "Failed to copy spec in status", "err", err) - return false - } + // Do not compare status with spec if cluster reconciliation is paused + // `paused` flag only exists in the spec and not in the status. + if !asdbv1.GetBool(aeroCluster.Spec.Paused) { + // Validate status + statusToSpec, err := asdbv1.CopyStatusToSpec(&newCluster.Status.AerospikeClusterStatusSpec) + if err != nil { + pkgLog.Error(err, "Failed to copy spec in status", "err", err) + return false + } - if !reflect.DeepEqual(statusToSpec, &newCluster.Spec) { - pkgLog.Info("Cluster status is not matching the spec") - return false + if !reflect.DeepEqual(statusToSpec, &newCluster.Spec) { + pkgLog.Info("Cluster status is not matching the spec") + return false + } } // TODO: This is not valid for tests where maxUnavailablePods flag is used. @@ -322,6 +327,8 @@ func isClusterStateValid( aeroCluster.Spec.Image, ), ) + + return false } if newCluster.Labels[asdbv1.AerospikeAPIVersionLabel] != asdbv1.AerospikeAPIVersion { @@ -816,3 +823,45 @@ func getPasswordFromSecret(k8sClient client.Client, return string(passBytes), nil } + +func getAerospikeClient(aeroCluster *asdbv1.AerospikeCluster, k8sClient client.Client) (*as.Client, error) { + policy := getClientPolicy(aeroCluster, k8sClient) + policy.FailIfNotConnected = false + policy.Timeout = time.Minute * 2 + policy.UseServicesAlternate = true + policy.ConnectionQueueSize = 100 + policy.LimitConnectionsToQueueSize = true + + hostList := make([]*as.Host, 0, len(aeroCluster.Status.Pods)) + + for podName := range aeroCluster.Status.Pods { + pod := aeroCluster.Status.Pods[podName] + + host, err := createHost(&pod) + if err != nil { + return nil, err + } + + hostList = append(hostList, host) + } + + asClient, err := as.NewClientWithPolicyAndHost(policy, hostList...) + if asClient == nil { + return nil, fmt.Errorf( + "failed to create aerospike cluster asClient: %v", err, + ) + } + + _, _ = asClient.WarmUp(-1) + + // Wait for 5 minutes for cluster to connect + for j := 0; j < 150; j++ { + if isConnected := asClient.IsConnected(); isConnected { + break + } + + time.Sleep(time.Second * 2) + } + + return asClient, nil +} diff --git a/test/warm_restart_test.go b/test/warm_restart_test.go index f1fe64aa3..99ed226fc 100644 --- a/test/warm_restart_test.go +++ b/test/warm_restart_test.go @@ -28,29 +28,16 @@ var _ = Describe( WarmRestart(ctx) }, ) - It( - "Should cold start without tini", func() { - PodRestart(ctx) - }, - ) - }, ) }, ) func WarmRestart(ctx goCtx.Context) { - rollCluster(ctx, latestImage, true) -} - -func PodRestart(ctx goCtx.Context) { - image := fmt.Sprintf( - "aerospike/aerospike-server-enterprise:%s", "5.7.0.8", - ) - rollCluster(ctx, image, false) + rollCluster(ctx, latestImage) } -func rollCluster(ctx goCtx.Context, image string, expectWarmStart bool) { +func rollCluster(ctx goCtx.Context, image string) { clusterName := "warm-restart-cluster" clusterNamespacedName := getNamespacedName(clusterName, namespace) @@ -94,7 +81,7 @@ func rollCluster(ctx goCtx.Context, image string, expectWarmStart bool) { pkgLog.Info("Rolling restarted", "Markers", podToMarkerPresent) for _, marker := range podToMarkerPresent { - Expect(marker).To(Equal(expectWarmStart)) + Expect(marker).To(Equal(true)) } }