Skip to content

Commit

Permalink
Merge pull request #313 from slickwarren/cwarren/v2.9/modular-snapshots
Browse files Browse the repository at this point in the history
[2.9] improve snapshot creation and validation
  • Loading branch information
caliskanugur authored Oct 10, 2024
2 parents ebfd445 + 8872af4 commit 47c902e
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 128 deletions.
68 changes: 59 additions & 9 deletions extensions/clusters/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package clusters
import (
"context"
"fmt"
"strings"
"slices"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -400,17 +400,22 @@ func WaitClusterToBeInUpgrade(client *rancher.Client, clusterID string) (err err
if err != nil {
return
}

checkFuncWaitToBeInUpgrade := func(event watch.Event) (bool, error) {
acceptableErrorMessages := []string{
"Cluster health check failed: Failed to communicate with API server during namespace check",
"the object has been modified",
}
clusterUnstructured := event.Object.(*unstructured.Unstructured)
summarizedCluster := summary.Summarize(clusterUnstructured)

clusterInfo = logClusterInfoWithChanges(clusterID, clusterInfo, summarizedCluster)

if summarizedCluster.Transitioning && !summarizedCluster.Error && (summarizedCluster.State == clusterStateUpdating || summarizedCluster.State == clusterStateUpgrading) {
return true, nil
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, nil
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, errors.Wrap(err, clusterErrorStateMessage)
}

Expand Down Expand Up @@ -440,16 +445,20 @@ func WaitClusterUntilUpgrade(client *rancher.Client, clusterID string) (err erro
return
}
checkFuncWaitUpgrade := func(event watch.Event) (bool, error) {
acceptableErrorMessages := []string{
"Cluster health check failed: Failed to communicate with API server during namespace check",
"the object has been modified",
}
clusterUnstructured := event.Object.(*unstructured.Unstructured)
summarizedCluster := summary.Summarize(clusterUnstructured)

clusterInfo = logClusterInfoWithChanges(clusterID, clusterInfo, summarizedCluster)

if summarizedCluster.IsReady() {
return true, nil
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, nil
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, errors.Wrap(err, clusterErrorStateMessage)

}
Expand Down Expand Up @@ -483,12 +492,53 @@ func WaitClusterToBeUpgraded(client *rancher.Client, clusterID string) (err erro
return
}

func isClusterInaccessible(messages []string) (isInaccessible bool) {
clusterCPErrorMessage := "Cluster health check failed: Failed to communicate with API server during namespace check" // For GKE
clusterModifiedErrorMessage := "the object has been modified" // For provisioning node driver K3s and RKE2
// WaitOnClusterAfterSnapshot waits for a cluster to finish taking a snapshot and return to an active state.
func WaitOnClusterAfterSnapshot(client *rancher.Client, clusterID string) error {
cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID)
if err != nil {
return err
}

isTransitioning := cluster.State == nil || cluster.State.Transitioning

if !isTransitioning {
err = kwait.PollUntilContextTimeout(context.TODO(), defaults.FiveHundredMillisecondTimeout, defaults.OneMinuteTimeout, true, func(ctx context.Context) (bool, error) {
cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID)
if err != nil {
return false, err
}

// note, this intentionally ignores cluster.State.Error, as that can sometimes appear during an upgrade during snapshots.
if cluster.State == nil {
return false, nil
}
return cluster.State.Transitioning, nil
})
if err != nil {
return err
}
}

err = kwait.PollUntilContextTimeout(context.TODO(), 1*time.Second, defaults.FifteenMinuteTimeout, true, func(ctx context.Context) (bool, error) {
cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID)
if err != nil {
return false, err
}

if cluster.State == nil {
return false, nil
}
// note, this intentionally ignores cluster.State.Error, as that can sometimes appear during an upgrade during snapshots.

return cluster.State.Name == active, nil
})

return err
}

func isClusterInaccessible(messages, acceptableErrors []string) (isInaccessible bool) {
for _, message := range messages {
if strings.Contains(message, clusterCPErrorMessage) || strings.Contains(message, clusterModifiedErrorMessage) {
if slices.Contains(acceptableErrors, message) {
isInaccessible = true
break
}
Expand Down
Loading

0 comments on commit 47c902e

Please sign in to comment.