Skip to content

Commit

Permalink
new: added Pipeline spec hash to PipelineRollout annotations.
Browse files Browse the repository at this point in the history
Signed-off-by: Antonino Fugazzotto <[email protected]>
  • Loading branch information
afugazzotto committed Jun 10, 2024
1 parent 2ccd4cd commit 2d33a6e
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 37 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ bin/*
go.work

config/crd/bases/_.yaml
.idea
.idea

config/crd/external/
52 changes: 19 additions & 33 deletions internal/controller/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -155,35 +154,22 @@ func (r *PipelineRolloutReconciler) reconcile(
controllerutil.AddFinalizer(pipelineRollout, finalizerName)
}

// apply Pipeline
// todo: store hash of spec in annotation; use to compare to determine if anything needs to be updated
obj := kubernetes.GenericObject{
TypeMeta: metav1.TypeMeta{
Kind: "Pipeline",
APIVersion: "numaflow.numaproj.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: pipelineRollout.Name,
Namespace: pipelineRollout.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pipelineRollout.GetObjectMeta(), apiv1.PipelineRolloutGroupVersionKind)},
},
Spec: pipelineRollout.Spec.Pipeline,
// make a Pipeline object and add/update spec hash on the PipelineRollout object
obj, rolloutChildOp, err := makeChildResourceFromRolloutAndUpdateSpecHash(ctx, r.restConfig, pipelineRollout)
if err != nil {
numaLogger.Errorf(err, "failed to make a Pipeline object and to update the PipelineRollout: %v", err)
return false, err
}

// Get the object to see if it exists
_, err := kubernetes.GetResource(ctx, r.restConfig, &obj, "pipelines")
if err != nil {
// create object as it doesn't exist
if apierrors.IsNotFound(err) {
err = kubernetes.CreateCR(ctx, r.restConfig, &obj, "pipelines")
if err != nil {
return false, err
}
if rolloutChildOp == RolloutChildNew {
err = kubernetes.CreateCR(ctx, r.restConfig, obj, "pipelines")
if err != nil {
return false, err
}
} else {
} else if rolloutChildOp == RolloutChildUpdate {
// If the pipeline already exists, first check if the pipeline status
// is pausing. If so, re-enqueue immediately.
pipeline, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "pipelines")
pipeline, err := kubernetes.GetCR(ctx, r.restConfig, obj, "pipelines")
if err != nil {
numaLogger.Errorf(err, "failed to get Pipeline: %v", err)
return false, err
Expand All @@ -201,13 +187,13 @@ func (r *PipelineRolloutReconciler) reconcile(
// Apply the new spec and resume the pipeline
// TODO: in the future, need to take into account whether Numaflow Controller
// or ISBService is being installed to determine whether it's safe to unpause
newObj, err := setPipelineDesiredStatus(&obj, "Running")
newObj, err := setPipelineDesiredStatus(obj, "Running")
if err != nil {
return false, err
}
obj = *newObj
obj = newObj

err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout)
err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout)
if err != nil {
return false, err
}
Expand All @@ -216,28 +202,28 @@ func (r *PipelineRolloutReconciler) reconcile(
}

// If pipeline status is not above, detect if pausing is required.
shouldPause, err := needsPausing(pipeline, &obj)
shouldPause, err := needsPausing(pipeline, obj)
if err != nil {
return false, err
}
if shouldPause {
// Use the existing spec, then pause and re-enqueue
obj.Spec = pipeline.Spec
newObj, err := setPipelineDesiredStatus(&obj, "Paused")
newObj, err := setPipelineDesiredStatus(obj, "Paused")
if err != nil {
return false, err
}
obj = *newObj
obj = newObj

err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout)
err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout)
if err != nil {
return false, err
}
return true, err
}

// If no need to pause, just apply the spec
err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout)
err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout)
if err != nil {
return false, err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/pipelinerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,16 @@ var _ = Describe("PipelineRollout Controller", func() {
It("Should not create the PipelineRollout", func() {
Expect(k8sClient.Create(ctx, &apiv1.PipelineRollout{
Spec: pipelineRollout.Spec,
})).To(HaveOccurred())
})).ShouldNot(Succeed())

Expect(k8sClient.Create(ctx, &apiv1.PipelineRollout{
ObjectMeta: pipelineRollout.ObjectMeta,
})).To(HaveOccurred())
})).ShouldNot(Succeed())

Expect(k8sClient.Create(ctx, &apiv1.PipelineRollout{
ObjectMeta: pipelineRollout.ObjectMeta,
Spec: apiv1.PipelineRolloutSpec{},
})).To(HaveOccurred())
})).ShouldNot(Succeed())
})
})
})
120 changes: 120 additions & 0 deletions internal/controller/shared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"errors"
"fmt"

"github.com/numaproj/numaplane/internal/kubernetes"
"github.com/numaproj/numaplane/internal/util"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
)

type RolloutChildOperation string

const (
// The child resource does not exist on the cluster and needs to be created
RolloutChildNew RolloutChildOperation = "CREATE_CHILD_RESOURCE"
// The child resource exists on the cluster but needs to be update
RolloutChildUpdate RolloutChildOperation = "UPDATE_CHILD_RESOURCE"
// The child resource exists on the cluster and does not need to be update
RolloutChildNone RolloutChildOperation = "DO_NOTHING"
)

// makeChildResourceFromRolloutAndUpdateSpecHash makes a new kubernetes.GenericObject based on the given rolloutObj.
// It returns the child resource object ready to be created and an operation to be performed with the returned object.
// The operations are defined by the RolloutChildOperation constants.
func makeChildResourceFromRolloutAndUpdateSpecHash(
ctx context.Context,
restConfig *rest.Config,
rolloutObj metav1.Object,
) (*kubernetes.GenericObject, RolloutChildOperation, error) {
kind := ""
pluralName := ""
var groupVersionKind schema.GroupVersionKind
var childResourceSpec runtime.RawExtension

// TODO: LOW PRIORITY: alternatively, consider passing kind, pluralName, groupVersionKind, and childResourceSpec as arguments
switch ro := rolloutObj.(type) {
case *apiv1.PipelineRollout:
kind = "Pipeline"
pluralName = "pipelines"
groupVersionKind = apiv1.PipelineRolloutGroupVersionKind
childResourceSpec = ro.Spec.Pipeline
case *apiv1.ISBServiceRollout:
kind = "InterStepBufferService"
pluralName = "interstepbufferservices"
groupVersionKind = apiv1.ISBServiceRolloutGroupVersionKind
childResourceSpec = ro.Spec.InterStepBufferService
default:
return nil, RolloutChildNone, errors.New("invalid rollout type")
}

obj := kubernetes.GenericObject{
TypeMeta: metav1.TypeMeta{
Kind: kind,
APIVersion: "numaflow.numaproj.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: rolloutObj.GetName(),
Namespace: rolloutObj.GetNamespace(),
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(rolloutObj, groupVersionKind)},
},
}

childResouceSpecHash := util.MustHash(childResourceSpec)

rolloutChildOp := RolloutChildNone
_, err := kubernetes.GetCR(ctx, restConfig, &obj, pluralName)
if err != nil {
if apierrors.IsNotFound(err) {
rolloutChildOp = RolloutChildNew
} else {
return nil, RolloutChildNone, fmt.Errorf("unable to get %s %s/%s: %v", kind, obj.Namespace, obj.Name, err)
}
}

if rolloutChildOp == RolloutChildNone {
annotations := rolloutObj.GetAnnotations()
if annotation, exists := annotations[apiv1.KeyHash]; exists && annotation != childResouceSpecHash {
rolloutChildOp = RolloutChildUpdate
}
}

setAnnotation(rolloutObj, apiv1.KeyHash, childResouceSpecHash)
obj.Spec = childResourceSpec

return &obj, rolloutChildOp, nil
}

func setAnnotation(obj metav1.Object, key, value string) {
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}

annotations[key] = value

obj.SetAnnotations(annotations)
}
46 changes: 46 additions & 0 deletions internal/util/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2024 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
)

func MustHash(v any) string {
switch data := v.(type) {
case []byte:
hash := sha256.New()
if _, err := hash.Write(data); err != nil {
panic(err)
}
return hex.EncodeToString(hash.Sum(nil))
case string:
return MustHash([]byte(data))
default:
return MustHash([]byte(MustJSON(v)))
}
}

func MustJSON(in any) string {
if data, err := json.Marshal(in); err != nil {
panic(err)
} else {
return string(data)
}
}
21 changes: 21 additions & 0 deletions pkg/apis/numaplane/v1alpha1/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2024 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

const (
KeyHash = "numaplane.numaproj.io/hash"
)

0 comments on commit 2d33a6e

Please sign in to comment.