diff --git a/.gitignore b/.gitignore index bbb976ce..23743220 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,6 @@ bin/* go.work config/crd/bases/_.yaml -.idea \ No newline at end of file +.idea + +config/crd/external/ diff --git a/internal/controller/pipelinerollout_controller.go b/internal/controller/pipelinerollout_controller.go index aba84e28..33ef3ef0 100644 --- a/internal/controller/pipelinerollout_controller.go +++ b/internal/controller/pipelinerollout_controller.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" @@ -155,35 +154,22 @@ func (r *PipelineRolloutReconciler) reconcile( controllerutil.AddFinalizer(pipelineRollout, finalizerName) } - // apply Pipeline - // todo: store hash of spec in annotation; use to compare to determine if anything needs to be updated - obj := kubernetes.GenericObject{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pipeline", - APIVersion: "numaflow.numaproj.io/v1alpha1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: pipelineRollout.Name, - Namespace: pipelineRollout.Namespace, - OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pipelineRollout.GetObjectMeta(), apiv1.PipelineRolloutGroupVersionKind)}, - }, - Spec: pipelineRollout.Spec.Pipeline, + // make a Pipeline object and add/update spec hash on the PipelineRollout object + obj, rolloutChildOp, err := makeChildResourceFromRolloutAndUpdateSpecHash(ctx, r.restConfig, pipelineRollout) + if err != nil { + numaLogger.Errorf(err, "failed to make a Pipeline object and to update the PipelineRollout: %v", err) + return false, err } - // Get the object to see if it exists - _, err := kubernetes.GetResource(ctx, r.restConfig, &obj, "pipelines") - if err != nil { - // create object as it doesn't exist - if apierrors.IsNotFound(err) { - err = kubernetes.CreateCR(ctx, r.restConfig, &obj, "pipelines") - if err != nil { - return false, err - } + if rolloutChildOp == RolloutChildNew { + err = kubernetes.CreateCR(ctx, r.restConfig, obj, "pipelines") + if err != nil { + return false, err } - } else { + } else if rolloutChildOp == RolloutChildUpdate { // If the pipeline already exists, first check if the pipeline status // is pausing. If so, re-enqueue immediately. - pipeline, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "pipelines") + pipeline, err := kubernetes.GetCR(ctx, r.restConfig, obj, "pipelines") if err != nil { numaLogger.Errorf(err, "failed to get Pipeline: %v", err) return false, err @@ -201,13 +187,13 @@ func (r *PipelineRolloutReconciler) reconcile( // Apply the new spec and resume the pipeline // TODO: in the future, need to take into account whether Numaflow Controller // or ISBService is being installed to determine whether it's safe to unpause - newObj, err := setPipelineDesiredStatus(&obj, "Running") + newObj, err := setPipelineDesiredStatus(obj, "Running") if err != nil { return false, err } - obj = *newObj + obj = newObj - err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout) + err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout) if err != nil { return false, err } @@ -216,20 +202,20 @@ func (r *PipelineRolloutReconciler) reconcile( } // If pipeline status is not above, detect if pausing is required. - shouldPause, err := needsPausing(pipeline, &obj) + shouldPause, err := needsPausing(pipeline, obj) if err != nil { return false, err } if shouldPause { // Use the existing spec, then pause and re-enqueue obj.Spec = pipeline.Spec - newObj, err := setPipelineDesiredStatus(&obj, "Paused") + newObj, err := setPipelineDesiredStatus(obj, "Paused") if err != nil { return false, err } - obj = *newObj + obj = newObj - err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout) + err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout) if err != nil { return false, err } @@ -237,7 +223,7 @@ func (r *PipelineRolloutReconciler) reconcile( } // If no need to pause, just apply the spec - err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout) + err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout) if err != nil { return false, err } diff --git a/internal/controller/pipelinerollout_controller_test.go b/internal/controller/pipelinerollout_controller_test.go index 1ce2256f..ca15edf9 100644 --- a/internal/controller/pipelinerollout_controller_test.go +++ b/internal/controller/pipelinerollout_controller_test.go @@ -221,16 +221,16 @@ var _ = Describe("PipelineRollout Controller", func() { It("Should not create the PipelineRollout", func() { Expect(k8sClient.Create(ctx, &apiv1.PipelineRollout{ Spec: pipelineRollout.Spec, - })).To(HaveOccurred()) + })).ShouldNot(Succeed()) Expect(k8sClient.Create(ctx, &apiv1.PipelineRollout{ ObjectMeta: pipelineRollout.ObjectMeta, - })).To(HaveOccurred()) + })).ShouldNot(Succeed()) Expect(k8sClient.Create(ctx, &apiv1.PipelineRollout{ ObjectMeta: pipelineRollout.ObjectMeta, Spec: apiv1.PipelineRolloutSpec{}, - })).To(HaveOccurred()) + })).ShouldNot(Succeed()) }) }) }) diff --git a/internal/controller/shared.go b/internal/controller/shared.go new file mode 100644 index 00000000..92bd7fdf --- /dev/null +++ b/internal/controller/shared.go @@ -0,0 +1,120 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "errors" + "fmt" + + "github.com/numaproj/numaplane/internal/kubernetes" + "github.com/numaproj/numaplane/internal/util" + apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" +) + +type RolloutChildOperation string + +const ( + // The child resource does not exist on the cluster and needs to be created + RolloutChildNew RolloutChildOperation = "CREATE_CHILD_RESOURCE" + // The child resource exists on the cluster but needs to be update + RolloutChildUpdate RolloutChildOperation = "UPDATE_CHILD_RESOURCE" + // The child resource exists on the cluster and does not need to be update + RolloutChildNone RolloutChildOperation = "DO_NOTHING" +) + +// makeChildResourceFromRolloutAndUpdateSpecHash makes a new kubernetes.GenericObject based on the given rolloutObj. +// It returns the child resource object ready to be created and an operation to be performed with the returned object. +// The operations are defined by the RolloutChildOperation constants. +func makeChildResourceFromRolloutAndUpdateSpecHash( + ctx context.Context, + restConfig *rest.Config, + rolloutObj metav1.Object, +) (*kubernetes.GenericObject, RolloutChildOperation, error) { + kind := "" + pluralName := "" + var groupVersionKind schema.GroupVersionKind + var childResourceSpec runtime.RawExtension + + // TODO: LOW PRIORITY: alternatively, consider passing kind, pluralName, groupVersionKind, and childResourceSpec as arguments + switch ro := rolloutObj.(type) { + case *apiv1.PipelineRollout: + kind = "Pipeline" + pluralName = "pipelines" + groupVersionKind = apiv1.PipelineRolloutGroupVersionKind + childResourceSpec = ro.Spec.Pipeline + case *apiv1.ISBServiceRollout: + kind = "InterStepBufferService" + pluralName = "interstepbufferservices" + groupVersionKind = apiv1.ISBServiceRolloutGroupVersionKind + childResourceSpec = ro.Spec.InterStepBufferService + default: + return nil, RolloutChildNone, errors.New("invalid rollout type") + } + + obj := kubernetes.GenericObject{ + TypeMeta: metav1.TypeMeta{ + Kind: kind, + APIVersion: "numaflow.numaproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: rolloutObj.GetName(), + Namespace: rolloutObj.GetNamespace(), + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(rolloutObj, groupVersionKind)}, + }, + } + + childResouceSpecHash := util.MustHash(childResourceSpec) + + rolloutChildOp := RolloutChildNone + _, err := kubernetes.GetCR(ctx, restConfig, &obj, pluralName) + if err != nil { + if apierrors.IsNotFound(err) { + rolloutChildOp = RolloutChildNew + } else { + return nil, RolloutChildNone, fmt.Errorf("unable to get %s %s/%s: %v", kind, obj.Namespace, obj.Name, err) + } + } + + if rolloutChildOp == RolloutChildNone { + annotations := rolloutObj.GetAnnotations() + if annotation, exists := annotations[apiv1.KeyHash]; exists && annotation != childResouceSpecHash { + rolloutChildOp = RolloutChildUpdate + } + } + + setAnnotation(rolloutObj, apiv1.KeyHash, childResouceSpecHash) + obj.Spec = childResourceSpec + + return &obj, rolloutChildOp, nil +} + +func setAnnotation(obj metav1.Object, key, value string) { + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + + annotations[key] = value + + obj.SetAnnotations(annotations) +} diff --git a/internal/util/util.go b/internal/util/util.go new file mode 100644 index 00000000..0d84bb1f --- /dev/null +++ b/internal/util/util.go @@ -0,0 +1,46 @@ +/* +Copyright 2024 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" +) + +func MustHash(v any) string { + switch data := v.(type) { + case []byte: + hash := sha256.New() + if _, err := hash.Write(data); err != nil { + panic(err) + } + return hex.EncodeToString(hash.Sum(nil)) + case string: + return MustHash([]byte(data)) + default: + return MustHash([]byte(MustJSON(v))) + } +} + +func MustJSON(in any) string { + if data, err := json.Marshal(in); err != nil { + panic(err) + } else { + return string(data) + } +} diff --git a/pkg/apis/numaplane/v1alpha1/const.go b/pkg/apis/numaplane/v1alpha1/const.go new file mode 100644 index 00000000..98d45b0b --- /dev/null +++ b/pkg/apis/numaplane/v1alpha1/const.go @@ -0,0 +1,21 @@ +/* +Copyright 2024 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +const ( + KeyHash = "numaplane.numaproj.io/hash" +)