Skip to content

Commit

Permalink
Bugfix 2.4.0 (#176)
Browse files Browse the repository at this point in the history
* Added a check to not allow in-memory SC namespace
* Update access-control status just after reconciling access-control so that updated creds can be used by later info calls
* Skip waiting for migration and getSetRoster for AP mode in reconcile
* Determine removed namespaces by taking the difference between clusterNamespaces and specNamespaces
* Change reconcileRequeueAfter(0) to reconcileRequeueAfter(1) to make it effective
* Fix SC mode sample file
  • Loading branch information
sud82 authored Jan 12, 2023
1 parent 14dc0b0 commit ffb75f0
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 172 deletions.
24 changes: 11 additions & 13 deletions api/v1beta1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,13 @@ func (c *AerospikeCluster) validateSCNamespaces() error {
for _, nsConfInterface := range nsList {
nsConf := nsConfInterface.(map[string]interface{})

isEnabled, err := isNSSCEnabled(nsConf)
if err != nil {
return err
}
isEnabled := isNSSCEnabled(nsConf)
if isEnabled {
tmpSCNamespaceSet.Insert(nsConf["name"].(string))

if isInMemoryNamespace(nsConf) {
return fmt.Errorf("in-memory SC namespace is not supported, namespace %v", nsConf["name"])
}
}
}

Expand Down Expand Up @@ -1082,10 +1083,7 @@ func validateNamespaceReplicationFactor(
return err
}

scEnabled, err := isNSSCEnabled(nsConf)
if err != nil {
return err
}
scEnabled := isNSSCEnabled(nsConf)

// clSize < rf is allowed in AP mode but not in sc mode
if scEnabled && (clSize < rf) {
Expand All @@ -1094,13 +1092,13 @@ func validateNamespaceReplicationFactor(

return nil
}
func isNSSCEnabled(nsConf map[string]interface{}) (bool, error) {
func isNSSCEnabled(nsConf map[string]interface{}) bool {
scEnabled, ok := nsConf["strong-consistency"]
if !ok {
return false, nil
return false
}

return scEnabled.(bool), nil
return scEnabled.(bool)
}

func getNamespaceReplicationFactor(nsConf map[string]interface{}) (int, error) {
Expand Down Expand Up @@ -1371,7 +1369,7 @@ func validateStorageEngineDeviceListUpdate(nsConfList, statusNsConfList []interf
device := d.(string)
if deviceList[device] != "" && deviceList[device] != namespace {
return fmt.Errorf(
"device %s can not be re-used until complete cleanup namespace= %s, oldNamespace= %s",
"device %s can not be removed and re-used in a different namespace at the same time. It has to be removed first. currentNamespace `%s`, oldNamespace `%s`",
device, deviceList[device], namespace,
)
}
Expand All @@ -1383,7 +1381,7 @@ func validateStorageEngineDeviceListUpdate(nsConfList, statusNsConfList []interf
file := d.(string)
if fileList[file] != "" && fileList[file] != namespace {
return fmt.Errorf(
"file %s can not be re-used until complete cleanup namespace= %s, oldNamespace= %s",
"file %s can not be removed and re-used in a different namespace at the same time. It has to be removed first. currentNamespace `%s`, oldNamespace `%s`",
file, fileList[file], namespace,
)
}
Expand Down
16 changes: 16 additions & 0 deletions api/v1beta1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,19 @@ func GetMigrateFillDelay(asConfig *AerospikeConfigSpec) (int, error) {

return fillDelay, nil
}

// IsClusterSCEnabled returns true if cluster has a sc namespace
func IsClusterSCEnabled(aeroCluster *AerospikeCluster) bool {
// Look inside only 1st rack. SC namespaces should be same across all the racks
rack := aeroCluster.Spec.RackConfig.Racks[0]

nsList := rack.AerospikeConfig.Value["namespaces"].([]interface{})
for _, nsConfInterface := range nsList {
isEnabled := isNSSCEnabled(nsConfInterface.(map[string]interface{}))
if isEnabled {
return true
}
}

return false
}
2 changes: 1 addition & 1 deletion config/samples/sc_mode_cluster_cr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ spec:
size: 4
image: aerospike/aerospike-server-enterprise:6.1.0.1

rosterBlockList:
rosterNodeBlockList:
- 1A0
rackConfig:
namespaces:
Expand Down
4 changes: 2 additions & 2 deletions controllers/aero_info_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady(
return res
}

if err := r.validateSCClusterState(policy); err != nil {
if err := r.validateSCClusterState(policy, ignorablePods); err != nil {
return reconcileError(err)
}

Expand All @@ -71,7 +71,7 @@ func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady(
}

func (r *SingleClusterReconciler) quiescePods(policy *as.ClientPolicy, allHostConns []*deployment.HostConn, pods []*corev1.Pod) error {
removedNSes, err := r.removedNamespaces()
removedNSes, err := r.removedNamespaces(allHostConns)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (r *SingleClusterReconciler) safelyDeletePodsAndEnsureImageUpdated(
}

if len(activePods) != 0 {
r.Log.Info("Restart active pods with updated container image", "pods", getPodNames(failedPods))
r.Log.Info("Restart active pods with updated container image", "pods", getPodNames(activePods))
if res := r.waitForMultipleNodesSafeStopReady(activePods, ignorablePods); !res.isSuccess {
return res
}
Expand Down
26 changes: 19 additions & 7 deletions controllers/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,12 @@ func (r *SingleClusterReconciler) upgradeRack(
// Handle one batch
podsBatch := podsBatchList[0]

r.Log.Info("Calculated batch for doing rolling upgrade",
"rackPodList", getPodNames(podList),
"rearrangedPods", getPodNames(rearrangedPods),
"podsBatch", getPodNames(podsBatch),
"rollingUpdateBatchSize", r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize)

podNames := getPodNames(podsBatch)

r.Recorder.Eventf(r.aeroCluster, corev1.EventTypeNormal, "PodImageUpdate",
Expand All @@ -553,7 +559,7 @@ func (r *SingleClusterReconciler) upgradeRack(

// Handle the next batch in subsequent Reconcile.
if len(podsBatchList) > 1 {
return statefulSet, reconcileRequeueAfter(0)
return statefulSet, reconcileRequeueAfter(1)
}

// If it's last batch then go ahead
Expand Down Expand Up @@ -621,9 +627,9 @@ func (r *SingleClusterReconciler) scaleDownRack(
}

// Setup roster after migration.
if err := r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList); err != nil {
if err = r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, ignorablePods); err != nil {
r.Log.Error(err, "Failed to set roster for cluster")
return found, reconcileRequeueAfter(0)
return found, reconcileRequeueAfter(1)
}
}

Expand Down Expand Up @@ -659,7 +665,7 @@ func (r *SingleClusterReconciler) scaleDownRack(
// This can be left to the user but if we would do it here on our own then we can reuse
// objects like pvc and service. These objects would have been removed if scaleup is left for the user.
// In case of rolling restart, no pod cleanup happens, therefor rolling config back is left to the user.
if err := r.validateSCClusterState(policy); err != nil {
if err = r.validateSCClusterState(policy, ignorablePods); err != nil {
// reset cluster size
newSize := *found.Spec.Replicas + 1
found.Spec.Replicas = &newSize
Expand All @@ -676,7 +682,7 @@ func (r *SingleClusterReconciler) scaleDownRack(
),
)
}
return found, reconcileRequeueAfter(0)
return found, reconcileRequeueAfter(1)
}

// Fetch new object
Expand Down Expand Up @@ -710,7 +716,7 @@ func (r *SingleClusterReconciler) scaleDownRack(
rackState.Rack.ID, found.Namespace, found.Name, *found.Spec.Replicas,
desiredSize,
)
return found, reconcileRequeueAfter(0)
return found, reconcileRequeueAfter(1)
}

func (r *SingleClusterReconciler) rollingRestartRack(
Expand Down Expand Up @@ -780,14 +786,20 @@ func (r *SingleClusterReconciler) rollingRestartRack(
// Handle one batch
podsBatch := podsBatchList[0]

r.Log.Info("Calculated batch for doing rolling restart",
"rackPodList", getPodNames(podList),
"rearrangedPods", getPodNames(rearrangedPods),
"podsBatch", getPodNames(podsBatch),
"rollingUpdateBatchSize", r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize)

res := r.rollingRestartPods(rackState, podsBatch, ignorablePods, restartTypeMap)
if !res.isSuccess {
return found, res
}

// Handle next batch in subsequent Reconcile.
if len(podsBatchList) > 1 {
return found, reconcileRequeueAfter(0)
return found, reconcileRequeueAfter(1)
}

// If it's last batch then go ahead
Expand Down
97 changes: 72 additions & 25 deletions controllers/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/util/sets"
"reflect"
"strings"

Expand Down Expand Up @@ -133,8 +134,20 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) {
return reconcile.Result{}, err
}

// Update the AerospikeCluster status.
if err := r.updateAccessControlStatus(); err != nil {
r.Log.Error(err, "Failed to update AerospikeCluster access control status")
r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeWarning, "StatusUpdateFailed",
"Failed to update AerospikeCluster access control status %s/%s",
r.aeroCluster.Namespace, r.aeroCluster.Name,
)

return reconcile.Result{}, err
}

// Use policy from spec after setting up access control
policy := r.getClientPolicyFromSpec()
policy := r.getClientPolicy()

// revert migrate-fill-delay to original value if it was set to 0 during scale down
// Passing first rack from the list as all the racks will have same migrate-fill-delay
Expand All @@ -144,14 +157,16 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) {
return reconcile.Result{}, res.err
}

if res := r.waitForClusterStability(policy, allHostConns); !res.isSuccess {
return res.result, res.err
}
if asdbv1beta1.IsClusterSCEnabled(r.aeroCluster) {
if res := r.waitForClusterStability(policy, allHostConns); !res.isSuccess {
return res.result, res.err
}

// Setup roster
if err := r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList); err != nil {
r.Log.Error(err, "Failed to set roster for cluster")
return reconcile.Result{}, err
// Setup roster
if err := r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, nil); err != nil {
r.Log.Error(err, "Failed to set roster for cluster")
return reconcile.Result{}, err
}
}

// Update the AerospikeCluster status.
Expand Down Expand Up @@ -333,6 +348,41 @@ func (r *SingleClusterReconciler) updateStatus() error {
return nil
}

func (r *SingleClusterReconciler) updateAccessControlStatus() error {
if r.aeroCluster.Spec.AerospikeAccessControl == nil {
return nil
}

r.Log.Info("Update access control status for AerospikeCluster")

// Get the old object, it may have been updated in between.
newAeroCluster := &asdbv1beta1.AerospikeCluster{}
if err := r.Client.Get(
context.TODO(), types.NamespacedName{
Name: r.aeroCluster.Name, Namespace: r.aeroCluster.Namespace,
}, newAeroCluster,
); err != nil {
return err
}

// AerospikeAccessControl
statusAerospikeAccessControl := &asdbv1beta1.AerospikeAccessControlSpec{}
lib.DeepCopy(
statusAerospikeAccessControl, r.aeroCluster.Spec.AerospikeAccessControl,
)

newAeroCluster.Status.AerospikeClusterStatusSpec.AerospikeAccessControl = statusAerospikeAccessControl

if err := r.patchStatus(newAeroCluster); err != nil {
return fmt.Errorf("error updating status: %w", err)
}

r.aeroCluster = newAeroCluster
r.Log.Info("Updated access control status", "status", newAeroCluster.Status)

return nil
}

func (r *SingleClusterReconciler) createStatus() error {

r.Log.Info("Creating status for AerospikeCluster")
Expand Down Expand Up @@ -669,28 +719,25 @@ func (r *SingleClusterReconciler) checkPreviouslyFailedCluster() error {
return nil
}

func (r *SingleClusterReconciler) removedNamespaces() ([]string, error) {

var ns []string
statusNamespaces := make(map[string]bool)
specNamespaces := make(map[string]bool)
func (r *SingleClusterReconciler) removedNamespaces(allHostConns []*deployment.HostConn) ([]string, error) {
nodesNamespaces, err := deployment.GetClusterNamespaces(r.Log, r.getClientPolicy(), allHostConns)
if err != nil {
return nil, err
}

for _, rackStatus := range r.aeroCluster.Status.RackConfig.Racks {
for _, statusNamespace := range rackStatus.AerospikeConfig.Value["namespaces"].([]interface{}) {
statusNamespaces[statusNamespace.(map[string]interface{})["name"].(string)] = true
}
statusNamespaces := sets.NewString()
for _, namespaces := range nodesNamespaces {
statusNamespaces.Insert(namespaces...)
}

specNamespaces := sets.NewString()
for _, rackSpec := range r.aeroCluster.Spec.RackConfig.Racks {
for _, specNamespace := range rackSpec.AerospikeConfig.Value["namespaces"].([]interface{}) {
specNamespaces[specNamespace.(map[string]interface{})["name"].(string)] = true
for _, namespace := range rackSpec.AerospikeConfig.Value["namespaces"].([]interface{}) {
specNamespaces.Insert(namespace.(map[string]interface{})["name"].(string))
}
}

for statusNamespace := range statusNamespaces {
if !specNamespaces[statusNamespace] {
ns = append(ns, statusNamespace)
}
}
return ns, nil
removedNamespaces := statusNamespaces.Difference(specNamespaces)

return removedNamespaces.List(), nil
}
17 changes: 9 additions & 8 deletions controllers/strong_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,33 @@ package controllers
import (
"github.com/aerospike/aerospike-management-lib/deployment"
as "github.com/ashishshinde/aerospike-client-go/v6"
corev1 "k8s.io/api/core/v1"
)

func (r *SingleClusterReconciler) getAndSetRoster(policy *as.ClientPolicy, rosterNodeBlockList []string) error {
hostConns, err := r.newAllHostConn()
func (r *SingleClusterReconciler) getAndSetRoster(policy *as.ClientPolicy, rosterNodeBlockList []string, ignorablePods []corev1.Pod) error {
allHostConns, err := r.newAllHostConnWithOption(ignorablePods)
if err != nil {
return err
}

removedNSes, err := r.removedNamespaces()
removedNSes, err := r.removedNamespaces(allHostConns)
if err != nil {
return err
}

return deployment.GetAndSetRoster(r.Log, hostConns, policy, rosterNodeBlockList, removedNSes)
return deployment.GetAndSetRoster(r.Log, allHostConns, policy, rosterNodeBlockList, removedNSes)
}

func (r *SingleClusterReconciler) validateSCClusterState(policy *as.ClientPolicy) error {
hostConns, err := r.newAllHostConn()
func (r *SingleClusterReconciler) validateSCClusterState(policy *as.ClientPolicy, ignorablePods []corev1.Pod) error {
allHostConns, err := r.newAllHostConnWithOption(ignorablePods)
if err != nil {
return err
}

removedNSes, err := r.removedNamespaces()
removedNSes, err := r.removedNamespaces(allHostConns)
if err != nil {
return err
}

return deployment.ValidateSCClusterState(r.Log, hostConns, policy, removedNSes)
return deployment.ValidateSCClusterState(r.Log, allHostConns, policy, removedNSes)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/aerospike/aerospike-kubernetes-operator
go 1.18

require (
github.com/aerospike/aerospike-management-lib v0.0.0-20230110105455-65200ca3a494
github.com/aerospike/aerospike-management-lib v0.0.0-20230111155501-5021d4245958
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/ashishshinde/aerospike-client-go/v6 v6.0.1-0.20220606044039-77304169d3a4
github.com/evanphx/json-patch v4.11.0+incompatible
Expand Down
Loading

0 comments on commit ffb75f0

Please sign in to comment.