Skip to content

Commit

Permalink
Rewrite the sts-deletion logic
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Bigler <[email protected]>
  • Loading branch information
TheBigLee committed Sep 4, 2024
1 parent e122f9f commit a063a3b
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 147 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ clean:
get-crds:
./hack/get_crds.sh https://github.com/crossplane-contrib/provider-helm provider-helm apis/release apis/helm
./hack/get_crds.sh https://github.com/crossplane-contrib/provider-kubernetes provider-kubernetes apis/object/v1alpha2 apis/kubernetes

# There is currently a bug with the serialization if `inline` and `omitempty` are set: https://github.com/crossplane/function-sdk-go/issues/161
sed -i 's/inline,omitempty/inline/g' apis/helm/release/v1beta1/types.go
# provider-sql needs manual fixes... Running this every time would break them.
# The crossplane code generator only works if the code is valid, but the code is not valid until the code generator has run...
#./hack/get_crds.sh https://github.com/crossplane-contrib/provider-sql provider-sql apis/ apis/sql
Expand Down
2 changes: 1 addition & 1 deletion apis/helm/release/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type NamespacedName struct {

// DataKeySelector defines required spec to access a key of a configmap or secret
type DataKeySelector struct {
NamespacedName `json:",inline,omitempty"`
NamespacedName `json:",inline"`
Key string `json:"key,omitempty"`
Optional bool `json:"optional,omitempty"`
}
Expand Down
113 changes: 69 additions & 44 deletions pkg/comp-functions/functions/spksmariadb/pvcresize.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,67 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeMariaDBInst
return runtime.NewFatalResult(fmt.Errorf("cannot parse release values from desired release: %w", err))
}

if val, ok := release.GetAnnotations()["crossplane.io/paused"]; ok && val == "true" {
// The release has just been updated and paused and is waiting for the deletion job to finish
// The deletion job should remove the annotation once it's done.

xJob := &xkubev1.Object{}
err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err))
}
// If there's no job observed, we're done here.
if err == runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job, but release is paused: %w", err))
}

sts := &appsv1.StatefulSet{}
err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset job: %w", err))
}

// If the xkube object has been created it's still possible that the actual job hasn't been observedJob.
observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0

// Check the sts if it has been updated
stsSize := int64(0)
if len(sts.Spec.VolumeClaimTemplates) > 0 {
stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64()
}
newSize, found, err := unstructured.NestedString(values, "persistence", "size")
if !found {
return runtime.NewFatalResult(fmt.Errorf("disk size not found in observed release"))
}

if err != nil {
return runtime.NewFatalResult(fmt.Errorf("failed to read size from observed release: %w", err))
}
desiredSize, err := getSizeAsInt(newSize)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err))
}
stsUpdated := stsSize == desiredSize

deletionJob := &batchv1.Job{}
if observedJob {
err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err))
}
}

// The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop
// Also as long as it hasn't finished we need to make sure it exists.
if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) {
err := addDeletionJob(svc, comp, newSize, release.GetName())
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create RBAC for the deletion job: %w", err))
}
}
return nil
}

err = addStsObserver(svc, comp)
if err != nil {
return runtime.NewWarningResult(fmt.Errorf("cannot observe sts: %w", err).Error())
Expand Down Expand Up @@ -74,53 +135,17 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeMariaDBInst
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err))
}
// We pause the release at this point to make sure that provider-helm doesn't update the
// release until the deletion job removed the sts
release.SetAnnotations(map[string]string{
"crossplane.io/paused": "true",
})

return nil
}

xJob := &xkubev1.Object{}
err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err))
}
// If there's no job observed, we're done here.
if err == runtime.ErrNotFound {
return nil
}

err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset: %w", err))
}

// If the xkube object has been created it's still possible that the actual job hasn't been observedJob.
observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0

// Check the sts if it has been updated
if len(sts.Spec.VolumeClaimTemplates) > 0 {
stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64()
}
desiredSize, err := getSizeAsInt(newSize)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err))
}
stsUpdated := stsSize == desiredSize

deletionJob := &batchv1.Job{}
if observedJob {
err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err))
}
}

// The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop
// Also as long as it hasn't finished we need to make sure it exists.
if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) {
err := addDeletionJob(svc, comp, newSize, release.GetName())
err = svc.SetDesiredComposedResourceWithName(release, mariadbRelease)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err))
return runtime.NewFatalResult(fmt.Errorf("Can't pause the release: %w", err))
}
return nil
}

return nil
Expand Down
61 changes: 34 additions & 27 deletions pkg/comp-functions/functions/spksmariadb/script/recreate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,37 @@ found=$(kubectl -n "$namespace" get sts "$name" -o json --ignore-not-found)

foundsize=$(echo -En "$found" | jq -r '.spec.volumeClaimTemplates[] | select(.metadata.name=="data") | .spec.resources.requests.storage')

if [[ $foundsize != "$size" ]]; then
echo "PVC sizes don't match, deleting sts"
# We try to delete the sts and wait for 5s. On APPUiO it can happen that the
# deletion with orphan doesn't go through and the sts is stuck with an orphan finalizer.
# So if the delete hasn't returned after 5s we forcefully patch away the finalizer.
kubectl -n "$namespace" delete sts "$name" --cascade=orphan --ignore-not-found --wait=true --timeout 5s || true
kubectl -n "$namespace" patch sts "$name" -p '{"metadata":{"finalizers":null}}' || true
# Poke the release so it tries again to create the sts
# We first set it to garbage to ensure that the release is in an invalid state, we use an invalid state so it doesn't
# actually deploy anything.
# Then we patch the right size to enforce an upgrade
# This is necessary as provider-helm doesn't actually retry failed helm deployments unless the values change.
echo "Triggering sts re-creation"
kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"persistence\":{\"size\":\"foo\"}}}}}"
kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"persistence\":{\"size\":\"$size\"}}}}}"
count=0
while ! kubectl -n "$namespace" get sts "$name" && [[ count -lt 300 ]]; do
echo "waiting for sts to re-appear"
count=$count+1
sleep 1
done
[[ $count -lt 300 ]] || (echo "Waited for 5 minutes for sts to re-appear"; exit 1)
echo "Set label on sts to trigger the statefulset-resize-controller"
kubectl -n "$namespace" label sts "$name" --overwrite "sts-resize.vshn.net/resize-inplace=true"
else
echo "Sizes match, nothing to do"
fi
paused=$(kubectl get release "$release" -o jsonpath='{.metadata.annotations.crossplane\.io\/paused}' --ignore-not-found)

i=0
while [[ i -lt 300 ]]; do
if [[ $foundsize != "$size" ]]; then
echo "PVC sizes don't match, deleting sts"
# We try to delete the sts and wait for 5s. On APPUiO it can happen that the
# deletion with orphan doesn't go through and the sts is stuck with an orphan finalizer.
# So if the delete hasn't returned after 5s we forcefully patch away the finalizer.
kubectl -n "$namespace" delete sts "$name" --cascade=orphan --ignore-not-found --wait=true --timeout 5s || true
kubectl -n "$namespace" patch sts "$name" -p '{"metadata":{"finalizers":null}}' || true
# Upause the release so that the sts is recreated. We pause the release to avoid provider-helm updating the release
# before the sts is deleted.
# Then we first patch the siye to garbage and afterwards to the right size to enforce an upgrade
echo "Triggering sts re-creation"
kubectl annotate release "$release" "crossplane.io/paused-"
kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"persistence\":{\"size\":\"foo\"}}}}}"
kubectl patch release "$release" --type merge -p "{\"spec\":{\"forProvider\":{\"values\":{\"persistence\":{\"size\":\"$size\"}}}}}"
count=0
while ! kubectl -n "$namespace" get sts "$name" && [[ count -lt 300 ]]; do
echo "waiting for sts to re-appear"
count=$count+1
sleep 1
done
[[ $count -lt 300 ]] || (echo "Waited for 5 minutes for sts to re-appear"; exit 1)
echo "Set label on sts to trigger the statefulset-resize-controller"
kubectl -n "$namespace" label sts "$name" --overwrite "sts-resize.vshn.net/resize-inplace=true"
break
else
echo "Sizes match, nothing to do"
fi
i=$i+1
sleep 1
done
119 changes: 72 additions & 47 deletions pkg/comp-functions/functions/spksredis/pvcresize.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,81 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeRedisInstan
return runtime.NewFatalResult(fmt.Errorf("cannot parse release values from desired release: %w", err))
}

if val, ok := release.GetAnnotations()["crossplane.io/paused"]; ok && val == "true" {
// The release has just been updated and paused and is waiting for the deletion job to finish
// The deletion job should remove the annotation once it's done.

xJob := &xkubev1.Object{}
err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err))
}
// If there's no job observed, we're done here.
if err == runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job, but release is paused: %w", err))
}

sts := &appsv1.StatefulSet{}
err = svc.GetObservedKubeObject(sts, comp.GetName()+"-sts-observer")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset job: %w", err))
}

// If the xkube object has been created it's still possible that the actual job hasn't been observedJob.
observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0

// Check the sts if it has been updated
stsSize := int64(0)
if len(sts.Spec.VolumeClaimTemplates) > 0 {
stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64()
}
newSize, found, err := unstructured.NestedString(values, replicaKey, "persistence", "size")
if !found {
return runtime.NewFatalResult(fmt.Errorf("disk size not found in observed release"))
}

if err != nil {
return runtime.NewFatalResult(fmt.Errorf("failed to read size from observed release: %w", err))
}
desiredSize, err := getSizeAsInt(newSize)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err))
}
stsUpdated := stsSize == desiredSize

deletionJob := &batchv1.Job{}
if observedJob {
err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err))
}
}

// The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop
// Also as long as it hasn't finished we need to make sure it exists.
if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) {
err := addDeletionJob(svc, comp, newSize, release.GetName(), replicaKey)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create RBAC for the deletion job: %w", err))
}
}
return nil
}

err = addStsObserver(svc, comp)
if err != nil {
return runtime.NewWarningResult(fmt.Errorf("cannot observe sts: %w", err).Error())
}

sts := &appsv1.StatefulSet{}
err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer")
err = svc.GetObservedKubeObject(sts, comp.GetName()+"-sts-observer")
if err == runtime.ErrNotFound {
return nil
}
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset: %w", err))
}

stsSize := int64(0)
// Check the current size in the sts
if len(sts.Spec.VolumeClaimTemplates) > 0 {
Expand All @@ -78,53 +140,16 @@ func ResizeSpksPVCs(ctx context.Context, comp *spksv1alpha1.CompositeRedisInstan
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err))
}

return nil
}

xJob := &xkubev1.Object{}
err = svc.GetObservedComposedResource(xJob, comp.Name+"-sts-deleter")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed deletion job: %w", err))
}
// If there's no job observed, we're done here.
if err == runtime.ErrNotFound {
return nil
}

err = svc.GetObservedKubeObject(sts, comp.Name+"-sts-observer")
if err != nil && err != runtime.ErrNotFound {
return runtime.NewFatalResult(fmt.Errorf("cannot get observed statefulset: %w", err))
}

// If the xkube object has been created it's still possible that the actual job hasn't been observedJob.
observedJob := len(xJob.Status.AtProvider.Manifest.Raw) > 0

// Check the sts if it has been updated
if len(sts.Spec.VolumeClaimTemplates) > 0 {
stsSize, _ = sts.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage().AsInt64()
}
desiredSize, err := getSizeAsInt(newSize)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot parse desired size: %w", err))
}
stsUpdated := stsSize == desiredSize

deletionJob := &batchv1.Job{}
if observedJob {
err := json.Unmarshal(xJob.Status.AtProvider.Manifest.Raw, deletionJob)
// We pause the release at this point to make sure that provider-helm doesn't update the
// release until the deletion job removed the sts
release.SetAnnotations(map[string]string{
"crossplane.io/paused": "true",
})
err = svc.SetDesiredComposedResourceWithName(release, redisRelease)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot unmarshal sts deleter job: %w", err))
}
}

// The job hasn't been observed yet, so we need to keep it in desired, or we will have a recreate loop
// Also as long as it hasn't finished we need to make sure it exists.
if (!observedJob || deletionJob.Status.Succeeded < 1) || (sts.Status.ReadyReplicas == 0 && !stsUpdated) {
err := addDeletionJob(svc, comp, newSize, release.GetName(), replicaKey)
if err != nil {
return runtime.NewFatalResult(fmt.Errorf("cannot create the deletion job: %w", err))
return runtime.NewFatalResult(fmt.Errorf("Can't pause the release: %w", err))
}
return nil
}

return nil
Expand All @@ -140,7 +165,7 @@ func addStsObserver(svc *runtime.ServiceRuntime, comp *spksv1alpha1.CompositeRed

providerConfigRef := comp.GetLabels()["service.syn.tools/cluster"]

return svc.SetDesiredKubeObjectWithName(statefulset, comp.GetName()+"-sts-observer", "sts-observer", KubeOptionAddProviderRef(providerConfigRef, true))
return svc.SetDesiredKubeObject(statefulset, comp.GetName()+"-sts-observer", KubeOptionAddProviderRef(providerConfigRef, true))
}

func needReleasePatch(values map[string]interface{}, stsSize int64, replicaKey string) (bool, string, *xfnproto.Result) {
Expand Down
Loading

0 comments on commit a063a3b

Please sign in to comment.