Skip to content

Commit

Permalink
Rewrite the sts-deletion logic
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Bigler <[email protected]>
  • Loading branch information
TheBigLee committed Sep 3, 2024
1 parent b0a855d commit 5c61b48
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 99 deletions.
113 changes: 69 additions & 44 deletions pkg/comp-functions/functions/spksmariadb/pvcresize.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,67 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeMariaDBInst
return runtime.NewFatalResult(fmt.Errorf("cannot parse release values from desired release: %w", err))
}

if val, ok := release.GetAnnotations()["crossplane.io/paused"]; ok && val == "true" {
// The release has just been updated and paused and is waiting for the deletion job to finish
// The deletion job should remove the annotation once it's done.

xJob := &xkubev1.Object{}
err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err))
}
// If there's no job observed, we're done here.
if err == runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job, but release is paused: %w", err))
}

sts := &appsv1.StatefulSet{}
err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset job: %w", err))
}

// If the xkube object has been created it's still possible that the actual job hasn't been observedJob.
observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0

// Check the sts if it has been updated
stsSize := int64(0)
if len(sts.Spec.VolumeClaimTemplates) > 0 {
stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64()
}
newSize, found, err := unstructured.NestedString(values, "persistence", "size")
if !found {
return runtime.NewFatalResult(fmt.Errorf("disk size not found in observed release"))
}

if err != nil {
return runtime.NewFatalResult(fmt.Errorf("failed to read size from observed release: %w", err))
}
desiredSize, err := getSizeAsInt(newSize)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err))
}
stsUpdated := stsSize == desiredSize

deletionJob := &batchv1.Job{}
if observedJob {
err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err))
}
}

// The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop
// Also as long as it hasn't finished we need to make sure it exists.
if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) {
err := addDeletionJob(svc, comp, newSize, release.GetName())
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create RBAC for the deletion job: %w", err))
}
}
return nil
}

err = addStsObserver(svc, comp)
if err != nil {
return runtime.NewWarningResult(fmt.Errorf("cannot observe sts: %w", err).Error())
Expand Down Expand Up @@ -74,53 +135,17 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeMariaDBInst
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err))
}

return nil
}

xJob := &xkubev1.Object{}
err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err))
}
// If there's no job observed, we're done here.
if err == runtime.ErrNotFound {
return nil
}

err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset: %w", err))
}

// If the xkube object has been created it's still possible that the actual job hasn't been observedJob.
observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0

// Check the sts if it has been updated
if len(sts.Spec.VolumeClaimTemplates) > 0 {
stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64()
}
desiredSize, err := getSizeAsInt(newSize)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err))
}
stsUpdated := stsSize == desiredSize

deletionJob := &batchv1.Job{}
if observedJob {
err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob)
// We pause the release at this point to make sure that provider-helm doesn't update the
// release until the deletion job removed the sts
release.SetAnnotations(map[string]string{
"crossplane.io/paused": "true",
})
err = svc.SetDesiredKubeObject(release, mariadbRelease)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err))
return runtime.NewFatalResult(fmt.Errorf("Can't pause the release: %w", err))
}
}

// The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop
// Also as long as it hasn't finished we need to make sure it exists.
if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) {
err := addDeletionJob(svc, comp, newSize, release.GetName())
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err))
}
return nil
}

return nil
Expand Down
9 changes: 4 additions & 5 deletions pkg/comp-functions/functions/spksmariadb/script/recreate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ if [[ $foundsize != "$size" ]]; then
# So if the delete hasn't returned after 5s we forcefully patch away the finalizer.
kubectl -n "$namespace" delete sts "$name" --cascade=orphan --ignore-not-found --wait=true --timeout 5s || true
kubectl -n "$namespace" patch sts "$name" -p '{"metadata":{"finalizers":null}}' || true
# Poke the release so it tries again to create the sts
# We first set it to garbage to ensure that the release is in an invalid state, we use an invalid state so it doesn't
# actually deploy anything.
# Then we patch the right size to enforce an upgrade
# This is necessary as provider-helm doesn't actually retry failed helm deployments unless the values change.
# Upause the release so that the sts is recreated. We pause the release to avoid provider-helm updating the release
# before the sts is deleted.
# Then we first patch the siye to garbage and afterwards to the right size to enforce an upgrade
echo "Triggering sts re-creation"
kubectl annotate release "$release" "crossplane.io/paused-"
kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"persistence\":{\"size\":\"foo\"}}}}}"
kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"persistence\":{\"size\":\"$size\"}}}}}"
count=0
Expand Down
115 changes: 70 additions & 45 deletions pkg/comp-functions/functions/spksredis/pvcresize.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,67 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeRedisInstan
return runtime.NewFatalResult(fmt.Errorf("cannot parse release values from desired release: %w", err))
}

if val, ok := release.GetAnnotations()["crossplane.io/paused"]; ok && val == "true" {
// The release has just been updated and paused and is waiting for the deletion job to finish
// The deletion job should remove the annotation once it's done.

xJob := &xkubev1.Object{}
err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err))
}
// If there's no job observed, we're done here.
if err == runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job, but release is paused: %w", err))
}

sts := &appsv1.StatefulSet{}
err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset job: %w", err))
}

// If the xkube object has been created it's still possible that the actual job hasn't been observedJob.
observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0

// Check the sts if it has been updated
stsSize := int64(0)
if len(sts.Spec.VolumeClaimTemplates) > 0 {
stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64()
}
newSize, found, err := unstructured.NestedString(values, replicaKey, "persistence", "size")
if !found {
return runtime.NewFatalResult(fmt.Errorf("disk size not found in observed release"))
}

if err != nil {
return runtime.NewFatalResult(fmt.Errorf("failed to read size from observed release: %w", err))
}
desiredSize, err := getSizeAsInt(newSize)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err))
}
stsUpdated := stsSize == desiredSize

deletionJob := &batchv1.Job{}
if observedJob {
err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err))
}
}

// The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop
// Also as long as it hasn't finished we need to make sure it exists.
if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) {
err := addDeletionJob(svc, comp, newSize, release.GetName(), replicaKey)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create RBAC for the deletion job: %w", err))
}
}
return nil
}

err = addStsObserver(svc, comp)
if err != nil {
return runtime.NewWarningResult(fmt.Errorf("cannot observe sts: %w", err).Error())
Expand All @@ -62,6 +123,7 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeRedisInstan
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset: %w", err))
}

stsSize := int64(0)
// Check the current size in the sts
if len(sts.Spec.VolumeClaimTemplates) > 0 {
Expand All @@ -78,53 +140,16 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeRedisInstan
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err))
}

return nil
}

xJob := &xkubev1.Object{}
err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err))
}
// If there's no job observed, we're done here.
if err == runtime.ErrNotFound {
return nil
}

err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset: %w", err))
}

// If the xkube object has been created it's still possible that the actual job hasn't been observedJob.
observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0

// Check the sts if it has been updated
if len(sts.Spec.VolumeClaimTemplates) > 0 {
stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64()
}
desiredSize, err := getSizeAsInt(newSize)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err))
}
stsUpdated := stsSize == desiredSize

deletionJob := &batchv1.Job{}
if observedJob {
err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err))
}
}

// The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop
// Also as long as it hasn't finished we need to make sure it exists.
if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) {
err := addDeletionJob(svc, comp, newSize, release.GetName(), replicaKey)
// We pause the release at this point to make sure that provider-helm doesn't update the
// release until the deletion job removed the sts
release.SetAnnotations(map[string]string{
"crossplane.io/paused": "true",
})
err = svc.SetDesiredKubeObject(release, redisRelease)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err))
return runtime.NewFatalResult(fmt.Errorf("Can't pause the release: %w", err))
}
return nil
}

return nil
Expand Down
8 changes: 3 additions & 5 deletions pkg/comp-functions/functions/spksredis/script/recreate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ if [[ $foundsize != "$size" ]]; then
# So if the delete hasn't returned after 5s we forcefully patch away the finalizer.
kubectl -n "$namespace" delete sts "$name" --cascade=orphan --ignore-not-found --wait=true --timeout 5s || true
kubectl -n "$namespace" patch sts "$name" -p '{"metadata":{"finalizers":null}}' || true
# Poke the release so it tries again to create the sts
# We first set it to garbage to ensure that the release is in an invalid state, we use an invalid state so it doesn't
# actually deploy anything.
# Then we patch the right size to enforce an upgrade
# This is necessary as provider-helm doesn't actually retry failed helm deployments unless the values change.
# Upause the release so that the sts is recreated. We pause the release to avoid provider-helm updating the release
# before the sts is deleted.
echo "Triggering sts re-creation"
kubectl annotate release "$release" "crossplane.io/paused-"
kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"$replica_key\":{\"persistence\":{\"size\":\"foo\"}}}}}}"
kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"$replica_key\":{\"persistence\":{\"size\":\"$size\"}}}}}}"
count=0
Expand Down

0 comments on commit 5c61b48

Please sign in to comment.