Skip to content

Commit

Permalink
Node controller sets replica.spec.evictionRequested during auto eviction
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Weber <[email protected]>
  • Loading branch information
ejweber committed Nov 1, 2023
1 parent b93e9d4 commit 321e9f9
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 123 deletions.
101 changes: 20 additions & 81 deletions controller/instance_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func (imc *InstanceManagerController) syncInstanceManagerPDB(im *longhorn.Instan
return err
}

imPDB, err := imc.ds.GetPDBRO(imc.getPDBName(im))
imPDB, err := imc.ds.GetPDBRO(types.GetPDBName(im))
if err != nil && !datastore.ErrorIsNotFound(err) {
return err
}
Expand Down Expand Up @@ -666,7 +666,7 @@ func (imc *InstanceManagerController) cleanUpPDBForNonExistingIM() error {
if labelValue != types.LonghornLabelInstanceManager {
continue
}
if _, ok := ims[getIMNameFromPDBName(pdbName)]; ok {
if _, ok := ims[types.GetIMNameFromPDBName(pdbName)]; ok {
continue
}
if err := imc.ds.DeletePDB(pdbName); err != nil {
Expand All @@ -680,7 +680,7 @@ func (imc *InstanceManagerController) cleanUpPDBForNonExistingIM() error {
}

func (imc *InstanceManagerController) deleteInstanceManagerPDB(im *longhorn.InstanceManager) error {
name := imc.getPDBName(im)
name := types.GetPDBName(im)
imc.logger.Infof("Deleting %v PDB", name)
err := imc.ds.DeletePDB(name)
if err != nil && !datastore.ErrorIsNotFound(err) {
Expand Down Expand Up @@ -748,62 +748,31 @@ func (imc *InstanceManagerController) canDeleteInstanceManagerPDB(im *longhorn.I
targetReplicas = replicasOnCurrentNode
}

// For each replica in the target replica list,
// find out whether there is a PDB protected healthy replica of the same
// volume on another schedulable node.
// For each replica in the target replica list, find out whether there is a PDB protected healthy replica of the
// same volume on another schedulable node.
for _, replica := range targetReplicas {
vol, err := imc.ds.GetVolume(replica.Spec.VolumeName)
if err != nil {
return false, err
}
hasPDBOnAnotherNode := false
isUnusedReplicaOnCurrentNode := false

replicas, err := imc.ds.ListVolumeReplicas(vol.Name)
pdbProtectedHealthyReplicas, err := imc.ds.ListVolumePDBProtectedHealthyReplicas(replica.Spec.VolumeName)
if err != nil {
return false, err
}

hasPDBOnAnotherNode := false
isUnusedReplicaOnCurrentNode := false
for _, r := range replicas {
hasOtherHealthyReplicas := r.Spec.HealthyAt != "" && r.Spec.FailedAt == "" && r.Spec.NodeID != im.Spec.NodeID
if hasOtherHealthyReplicas {
unschedulable, err := imc.ds.IsKubeNodeUnschedulable(r.Spec.NodeID)
if err != nil {
return false, err
}
if unschedulable {
continue
}

var rIM *longhorn.InstanceManager
rIM, err = imc.getRunningReplicaInstancManager(r)
if err != nil {
return false, err
}
if rIM == nil {
continue
}

pdb, err := imc.ds.GetPDBRO(imc.getPDBName(rIM))
if err != nil && !datastore.ErrorIsNotFound(err) {
return false, err
}
if pdb != nil {
hasPDBOnAnotherNode = true
break
}
}
// If a replica has never been started, there is no data stored in this replica, and
// retaining it makes no sense for HA.
// Hence Longhorn doesn't need to block the PDB removal for the replica.
// This case typically happens on a newly created volume that hasn't been attached to any node.
// https://github.com/longhorn/longhorn/issues/2673
isUnusedReplicaOnCurrentNode = r.Spec.HealthyAt == "" && r.Spec.FailedAt == "" && r.Spec.NodeID == im.Spec.NodeID
if isUnusedReplicaOnCurrentNode {
for _, pdbProtectedHealthyReplica := range pdbProtectedHealthyReplicas {
if pdbProtectedHealthyReplica.Spec.NodeID != im.Spec.NodeID {
hasPDBOnAnotherNode = true
break
}
}

// If a replica has never been started, there is no data stored in this replica, and retaining it makes no sense
// for HA. Hence Longhorn doesn't need to block the PDB removal for the replica. This case typically happens on
// a newly created volume that hasn't been attached to any node.
// https://github.com/longhorn/longhorn/issues/2673
isUnusedReplicaOnCurrentNode = replica.Spec.HealthyAt == "" &&
replica.Spec.FailedAt == "" &&
replica.Spec.NodeID == im.Spec.NodeID

if !hasPDBOnAnotherNode && !isUnusedReplicaOnCurrentNode {
return false, nil
}
Expand All @@ -812,24 +781,6 @@ func (imc *InstanceManagerController) canDeleteInstanceManagerPDB(im *longhorn.I
return true, nil
}

func (imc *InstanceManagerController) getRunningReplicaInstancManager(r *longhorn.Replica) (im *longhorn.InstanceManager, err error) {
if r.Status.InstanceManagerName == "" {
im, err = imc.ds.GetInstanceManagerByInstance(r)
if err != nil && !types.ErrorIsNotFound(err) {
return nil, err
}
} else {
im, err = imc.ds.GetInstanceManager(r.Status.InstanceManagerName)
if err != nil && !apierrors.IsNotFound(err) {
return nil, err
}
}
if im == nil || im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
return nil, nil
}
return im, nil
}

func (imc *InstanceManagerController) areAllVolumesDetachedFromNode(nodeName string) (bool, error) {
detached, err := imc.areAllInstanceRemovedFromNodeByType(nodeName, longhorn.InstanceManagerTypeEngine)
if err != nil {
Expand Down Expand Up @@ -879,7 +830,7 @@ func (imc *InstanceManagerController) createInstanceManagerPDB(im *longhorn.Inst
func (imc *InstanceManagerController) generateInstanceManagerPDBManifest(im *longhorn.InstanceManager) *policyv1.PodDisruptionBudget {
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: imc.getPDBName(im),
Name: types.GetPDBName(im),
Namespace: imc.namespace,
},
Spec: policyv1.PodDisruptionBudgetSpec{
Expand All @@ -891,18 +842,6 @@ func (imc *InstanceManagerController) generateInstanceManagerPDBManifest(im *lon
}
}

func (imc *InstanceManagerController) getPDBName(im *longhorn.InstanceManager) string {
return getPDBNameFromIMName(im.Name)
}

func getPDBNameFromIMName(imName string) string {
return imName
}

func getIMNameFromPDBName(pdbName string) string {
return pdbName
}

func (imc *InstanceManagerController) enqueueInstanceManager(instanceManager interface{}) {
key, err := controller.KeyFunc(instanceManager)
if err != nil {
Expand Down
97 changes: 55 additions & 42 deletions controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,6 @@ func (nc *NodeController) syncNode(key string) (err error) {
node.Status.Region, node.Status.Zone = types.GetRegionAndZone(kubeNode.Labels)
}

if err = nc.syncAutoEvictingStatus(node, kubeNode); err != nil {
return err
}

if err = nc.syncReplicaEvictionRequested(node); err != nil {
return err
}

if nc.controllerID != node.Name {
return nil
}
Expand Down Expand Up @@ -561,6 +553,10 @@ func (nc *NodeController) syncNode(key string) (err error) {
return err
}

if err = nc.syncReplicaEvictionRequested(node, kubeNode); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -1447,31 +1443,14 @@ func (nc *NodeController) createSnapshotMonitor() (mon monitor.Monitor, err erro
return mon, nil
}

func (nc *NodeController) syncAutoEvictingStatus(node *longhorn.Node, kubeNode *corev1.Node) error {
oldAutoEvicting := node.Status.AutoEvicting

func (nc *NodeController) syncReplicaEvictionRequested(node *longhorn.Node, kubeNode *corev1.Node) error {
log := getLoggerForNode(nc.logger, node)
node.Status.AutoEvicting = false
replicasToSync := []*longhorn.Replica{}
nodeDrainPolicy, err := nc.ds.GetSettingValueExisted(types.SettingNameNodeDrainPolicy)
if err != nil {
return err
}
if nodeDrainPolicy != string(types.NodeDrainPolicyBlockForEviction) {
node.Status.AutoEvicting = false // We only auto evict in response to the associated policy.
} else if kubeNode.Spec.Unschedulable {
node.Status.AutoEvicting = true
} else {
node.Status.AutoEvicting = false
}

if oldAutoEvicting != node.Status.AutoEvicting {
log := getLoggerForNode(nc.logger, node).WithFields(logrus.Fields{"nodeDrainPolicy": nodeDrainPolicy,
"kubernetesNodeUnschedulable": kubeNode.Spec.Unschedulable})
log.Infof("Changed auto eviction status to %t", node.Status.AutoEvicting)
}
return nil
}

func (nc *NodeController) syncReplicaEvictionRequested(node *longhorn.Node) error {
replicasToSync := []*longhorn.Replica{}

for diskName, diskSpec := range node.Spec.Disks {
diskStatus := node.Status.DiskStatus[diskName]
Expand All @@ -1480,19 +1459,28 @@ func (nc *NodeController) syncReplicaEvictionRequested(node *longhorn.Node) erro
if err != nil {
return err
}
shouldEvictReplica := nc.shouldEvictReplica(node, &diskSpec, replica)
shouldEvictReplica, err := nc.shouldEvictReplica(node, kubeNode, &diskSpec, replica, nodeDrainPolicy)
if err != nil {
replicaLog := log.WithField("replica", replica.Name)
replicaLog.Warn("Failed to check if replica should be evicted, will enqueue then resync node")
}
if replica.Spec.EvictionRequested != shouldEvictReplica {
replica.Spec.EvictionRequested = shouldEvictReplica
replicasToSync = append(replicasToSync, replica)
}

if replica.Spec.EvictionRequested && (nodeDrainPolicy == string(types.NodeDrainPolicyBlockForEviction) ||
nodeDrainPolicy == string(types.NodeDrainPolicyBlockForEvictionIfContainsLastReplica)) {
node.Status.AutoEvicting = true
}
}
}

for _, replica := range replicasToSync {
log := getLoggerForNode(nc.logger, node).WithField("replica", replica.Name)
replicaLog := log.WithField("replica", replica.Name)
log.Infof("Updating evictionRequested to %t", replica.Spec.EvictionRequested)
if _, err := nc.ds.UpdateReplica(replica); err != nil {
log.Warn("Failed to update evictionRequested, will enqueue then resync node")
replicaLog.Warn("Failed to update evictionRequested, will enqueue then resync node")
nc.enqueueNode(node)
continue
}
Expand All @@ -1501,18 +1489,43 @@ func (nc *NodeController) syncReplicaEvictionRequested(node *longhorn.Node) erro
return nil
}

func (nc *NodeController) shouldEvictReplica(node *longhorn.Node, diskSpec *longhorn.DiskSpec, replica *longhorn.Replica) bool {
// TODO: Should we cancel evictions if a node is down or deleted?
// if isDownOrDeleted, err := rc.ds.IsNodeDownOrDeleted(replica.Spec.NodeID); err != nil {
// log.WithError(err).Warn("Failed to check if node is down or deleted")
// return false
func (nc *NodeController) shouldEvictReplica(node *longhorn.Node, kubeNode *corev1.Node, diskSpec *longhorn.DiskSpec,
replica *longhorn.Replica, nodeDrainPolicy string) (bool, error) {
// TODO: The replica controller previously cancelled evictions when a node was down or deleted. Is there something
// we need to do here?
// if isDownOrDeleted, err := nc.ds.IsNodeDownOrDeleted(node.Spec.Name); err != nil {
// return false, err
// } else if isDownOrDeleted {
// return false
// return false, nil
// }

// Check if node has requested eviction or is attempting to auto-evict replicas.
if node.Spec.EvictionRequested || node.Status.AutoEvicting {
return true
if node.Spec.EvictionRequested || diskSpec.EvictionRequested {
return true, nil
}
if !kubeNode.Spec.Unschedulable {
return false, nil // Node drain policy only takes effect on cordoned nodes.
}
if nodeDrainPolicy == string(types.NodeDrainPolicyBlockForEviction) {
return true, nil
}
if nodeDrainPolicy != string(types.NodeDrainPolicyBlockForEvictionIfContainsLastReplica) {
return false, nil
}
return diskSpec.EvictionRequested

pdbProtectedHealthyReplicas, err := nc.ds.ListVolumePDBProtectedHealthyReplicas(replica.Spec.VolumeName)
if err != nil {
return false, err
}
hasPDBOnAnotherNode := false
for _, pdbProtectedHealthyReplica := range pdbProtectedHealthyReplicas {
if pdbProtectedHealthyReplica.Spec.NodeID != replica.Spec.NodeID {
hasPDBOnAnotherNode = true
break
}
}
if !hasPDBOnAnotherNode {
return true, nil
}

return false, nil
}
60 changes: 60 additions & 0 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,66 @@ func IsAvailableHealthyReplica(r *longhorn.Replica) bool {
return true
}

func (s *DataStore) ListVolumePDBProtectedHealthyReplicas(volumeName string) (map[string]*longhorn.Replica, error) {
pdbProtectedHealthyReplicas := map[string]*longhorn.Replica{}
replicas, err := s.ListVolumeReplicas(volumeName)
if err != nil {
return nil, err
}

for _, replica := range replicas {
if replica.Spec.HealthyAt == "" || replica.Spec.FailedAt != "" {
continue
}

unschedulable, err := s.IsKubeNodeUnschedulable(replica.Spec.NodeID)
if err != nil {
return map[string]*longhorn.Replica{}, err
}
if unschedulable {
continue
}

instanceManager, err := s.getRunningReplicaInstanceManager(replica)
if err != nil {
return map[string]*longhorn.Replica{}, err
}
if instanceManager == nil {
continue
}

pdb, err := s.GetPDBRO(types.GetPDBName(instanceManager))
if err != nil && !ErrorIsNotFound(err) {
return map[string]*longhorn.Replica{}, err
}
if pdb != nil {
pdbProtectedHealthyReplicas[replica.Name] = replica
}
}

return pdbProtectedHealthyReplicas, nil
}

func (s *DataStore) getRunningReplicaInstanceManager(replica *longhorn.Replica) (*longhorn.InstanceManager, error) {
var instanceManager *longhorn.InstanceManager
var err error
if replica.Status.InstanceManagerName == "" {
instanceManager, err = s.GetInstanceManagerByInstance(replica)
if err != nil && !types.ErrorIsNotFound(err) {
return nil, err
}
} else {
instanceManager, err = s.GetInstanceManager(replica.Status.InstanceManagerName)
if err != nil && !ErrorIsNotFound(err) {
return nil, err
}
}
if instanceManager == nil || instanceManager.Status.CurrentState != longhorn.InstanceManagerStateRunning {
return nil, nil
}
return instanceManager, nil
}

// IsReplicaRebuildingFailed returns true if the rebuilding replica failed not caused by network issues.
func IsReplicaRebuildingFailed(reusableFailedReplica *longhorn.Replica) bool {
replicaRebuildFailedCondition := types.GetCondition(reusableFailedReplica.Status.Conditions, longhorn.ReplicaConditionTypeRebuildFailed)
Expand Down
12 changes: 12 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,3 +1080,15 @@ func GetBackupTargetSchemeFromURL(backupTargetURL string) string {
return ValueUnknown
}
}

func GetPDBName(im *longhorn.InstanceManager) string {
return GetPDBNameFromIMName(im.Name)
}

func GetPDBNameFromIMName(imName string) string {
return imName
}

func GetIMNameFromPDBName(pdbName string) string {
return pdbName
}

0 comments on commit 321e9f9

Please sign in to comment.