Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.8] improve snapshot creation and validation #314

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 59 additions & 9 deletions extensions/clusters/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package clusters
import (
"context"
"fmt"
"strings"
"slices"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -400,17 +400,22 @@ func WaitClusterToBeInUpgrade(client *rancher.Client, clusterID string) (err err
if err != nil {
return
}

checkFuncWaitToBeInUpgrade := func(event watch.Event) (bool, error) {
acceptableErrorMessages := []string{
"Cluster health check failed: Failed to communicate with API server during namespace check",
"the object has been modified",
}
clusterUnstructured := event.Object.(*unstructured.Unstructured)
summarizedCluster := summary.Summarize(clusterUnstructured)

clusterInfo = logClusterInfoWithChanges(clusterID, clusterInfo, summarizedCluster)

if summarizedCluster.Transitioning && !summarizedCluster.Error && (summarizedCluster.State == clusterStateUpdating || summarizedCluster.State == clusterStateUpgrading) {
return true, nil
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, nil
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, errors.Wrap(err, clusterErrorStateMessage)
}

Expand Down Expand Up @@ -440,16 +445,20 @@ func WaitClusterUntilUpgrade(client *rancher.Client, clusterID string) (err erro
return
}
checkFuncWaitUpgrade := func(event watch.Event) (bool, error) {
acceptableErrorMessages := []string{
"Cluster health check failed: Failed to communicate with API server during namespace check",
"the object has been modified",
}
clusterUnstructured := event.Object.(*unstructured.Unstructured)
summarizedCluster := summary.Summarize(clusterUnstructured)

clusterInfo = logClusterInfoWithChanges(clusterID, clusterInfo, summarizedCluster)

if summarizedCluster.IsReady() {
return true, nil
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, nil
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message) {
} else if summarizedCluster.Error && !isClusterInaccessible(summarizedCluster.Message, acceptableErrorMessages) {
return false, errors.Wrap(err, clusterErrorStateMessage)

}
Expand Down Expand Up @@ -483,12 +492,53 @@ func WaitClusterToBeUpgraded(client *rancher.Client, clusterID string) (err erro
return
}

func isClusterInaccessible(messages []string) (isInaccessible bool) {
clusterCPErrorMessage := "Cluster health check failed: Failed to communicate with API server during namespace check" // For GKE
clusterModifiedErrorMessage := "the object has been modified" // For provisioning node driver K3s and RKE2
// WaitOnClusterAfterSnapshot waits for a cluster to finish taking a snapshot and return to an active state.
func WaitOnClusterAfterSnapshot(client *rancher.Client, clusterID string) error {
cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID)
if err != nil {
return err
}

isTransitioning := cluster.State == nil || cluster.State.Transitioning

if !isTransitioning {
err = kwait.PollUntilContextTimeout(context.TODO(), defaults.FiveHundredMillisecondTimeout, defaults.OneMinuteTimeout, true, func(ctx context.Context) (bool, error) {
cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID)
if err != nil {
return false, err
}

// note, this intentionally ignores cluster.State.Error, as that can sometimes appear during an upgrade during snapshots.
if cluster.State == nil {
return false, nil
}
return cluster.State.Transitioning, nil
})
if err != nil {
return err
}
}

err = kwait.PollUntilContextTimeout(context.TODO(), 1*time.Second, defaults.FifteenMinuteTimeout, true, func(ctx context.Context) (bool, error) {
cluster, err := client.Steve.SteveType(ProvisioningSteveResourceType).ByID(clusterID)
if err != nil {
return false, err
}

if cluster.State == nil {
return false, nil
}
// note, this intentionally ignores cluster.State.Error, as that can sometimes appear during an upgrade during snapshots.

return cluster.State.Name == active, nil
})

return err
}

func isClusterInaccessible(messages, acceptableErrors []string) (isInaccessible bool) {
for _, message := range messages {
if strings.Contains(message, clusterCPErrorMessage) || strings.Contains(message, clusterModifiedErrorMessage) {
if slices.Contains(acceptableErrors, message) {
isInaccessible = true
break
}
Expand Down
Loading