diff --git a/.gitignore b/.gitignore index bbb976ce..23743220 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,6 @@ bin/* go.work config/crd/bases/_.yaml -.idea \ No newline at end of file +.idea + +config/crd/external/ diff --git a/internal/controller/isbservicerollout_controller.go b/internal/controller/isbservicerollout_controller.go index afe16aa7..0810d5ad 100644 --- a/internal/controller/isbservicerollout_controller.go +++ b/internal/controller/isbservicerollout_controller.go @@ -21,7 +21,6 @@ import ( "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" @@ -33,7 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaplane/internal/kubernetes" + "github.com/numaproj/numaplane/internal/util/kubernetes" "github.com/numaproj/numaplane/internal/util/logger" apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" ) @@ -77,7 +76,7 @@ func NewISBServiceRolloutReconciler( func (r *ISBServiceRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // update the Base Logger's level according to the Numaplane Config logger.RefreshBaseLoggerLevel() - numaLogger := logger.GetBaseLogger().WithName("reconciler").WithValues("isbservicerollout", req.NamespacedName) + numaLogger := logger.GetBaseLogger().WithName("isbservicerollout-reconciler").WithValues("isbservicerollout", req.NamespacedName) isbServiceRollout := &apiv1.ISBServiceRollout{} if err := r.client.Get(ctx, req.NamespacedName, isbServiceRollout); err != nil { @@ -143,29 +142,28 @@ func (r *ISBServiceRolloutReconciler) reconcile(ctx context.Context, isbServiceR controllerutil.AddFinalizer(isbServiceRollout, finalizerName) } - // apply ISBService - // todo: store hash of spec in annotation; use to compare to determine if anything needs to be updated - obj := kubernetes.GenericObject{ - TypeMeta: metav1.TypeMeta{ - Kind: "InterStepBufferService", - APIVersion: "numaflow.numaproj.io/v1alpha1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: isbServiceRollout.Name, - Namespace: isbServiceRollout.Namespace, - OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(isbServiceRollout.GetObjectMeta(), apiv1.ISBServiceRolloutGroupVersionKind)}, - }, - Spec: isbServiceRollout.Spec.InterStepBufferService, + // make an InterStepBufferService object and add/update spec hash on the ISBServiceRollout object + obj, rolloutChildOp, err := makeChildResourceFromRolloutAndUpdateSpecHash(ctx, r.restConfig, isbServiceRollout) + if err != nil { + numaLogger.Errorf(err, "failed to make an InterStepBufferService object and to update the ISBServiceRollout: %v", err) + return err + } + + // TODO: instead of doing this, modify the ApplyCRSpec below to be similar to what is done on the PipelineRollout controller code + if rolloutChildOp == RolloutChildNoop { + numaLogger.Debug("InterStepBufferService spec is unchanged. No updates will be performed") + return nil } - err := kubernetes.ApplyCRSpec(ctx, r.restConfig, &obj, "interstepbufferservices") + err = kubernetes.ApplyCRSpec(ctx, r.restConfig, obj, "interstepbufferservices") if err != nil { numaLogger.Errorf(err, "failed to apply CR: %v", err) isbServiceRollout.Status.MarkFailed("ApplyISBServiceFailure", err.Error()) return err } + // after the Apply, Get the ISBService so that we can propagate its health into our Status - isbsvc, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "interstepbufferservices") + isbsvc, err := kubernetes.GetCR(ctx, r.restConfig, obj, "interstepbufferservices") if err != nil { numaLogger.Errorf(err, "failed to get ISBServices: %v", err) return err @@ -206,7 +204,9 @@ func (r *ISBServiceRolloutReconciler) needsUpdate(old, new *apiv1.ISBServiceRoll // check for any fields we might update in the Spec - generally we'd only update a Finalizer or maybe something in the metadata // TODO: we would need to update this if we ever add anything else, like a label or annotation - unless there's a generic check that makes sense - if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) { + // Checking only the Finalizers and Annotations allows to have more control on updating only when certain changes have been made. + // However, do we want to be also more specific? For instance, check specific finalizers and annotations (ex: .DeepEqual(old.Annotations["somekey"], new.Annotations["somekey"])) + if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) || !equality.Semantic.DeepEqual(old.Annotations, new.Annotations) { return true } return false diff --git a/internal/controller/numaflowcontrollerrollout_controller.go b/internal/controller/numaflowcontrollerrollout_controller.go index 5682c293..0def6777 100644 --- a/internal/controller/numaflowcontrollerrollout_controller.go +++ b/internal/controller/numaflowcontrollerrollout_controller.go @@ -46,7 +46,7 @@ import ( ) const ( - finalizerName = "numaplane-controller" + finalizerName = "numaplane.numaproj.io/numaplane-controller" ) const ( @@ -120,7 +120,7 @@ func loadDefinitions() (map[string]string, error) { func (r *NumaflowControllerRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // update the Base Logger's level according to the Numaplane Config logger.RefreshBaseLoggerLevel() - numaLogger := logger.GetBaseLogger().WithName("reconciler").WithValues("numaflowcontrollerrollout", req.NamespacedName) + numaLogger := logger.GetBaseLogger().WithName("numaflowcontrollerrollout-reconciler").WithValues("numaflowcontrollerrollout", req.NamespacedName) // TODO: only allow one controllerRollout per namespace. numaflowControllerRollout := &apiv1.NumaflowControllerRollout{} diff --git a/internal/controller/pipelinerollout_controller.go b/internal/controller/pipelinerollout_controller.go index aba84e28..27efdb4c 100644 --- a/internal/controller/pipelinerollout_controller.go +++ b/internal/controller/pipelinerollout_controller.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" @@ -35,7 +34,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" - "github.com/numaproj/numaplane/internal/kubernetes" + "github.com/numaproj/numaplane/internal/util/kubernetes" "github.com/numaproj/numaplane/internal/util/logger" apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" ) @@ -79,7 +78,7 @@ func NewPipelineRolloutReconciler( func (r *PipelineRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // update the Base Logger's level according to the Numaplane Config logger.RefreshBaseLoggerLevel() - numaLogger := logger.GetBaseLogger().WithName("reconciler").WithValues("pipelinerollout", req.NamespacedName) + numaLogger := logger.GetBaseLogger().WithName("pipelinerollout-reconciler").WithValues("pipelinerollout", req.NamespacedName) // update the context with this Logger so downstream users can incorporate these values in the logs ctx = logger.WithLogger(ctx, numaLogger) @@ -155,35 +154,22 @@ func (r *PipelineRolloutReconciler) reconcile( controllerutil.AddFinalizer(pipelineRollout, finalizerName) } - // apply Pipeline - // todo: store hash of spec in annotation; use to compare to determine if anything needs to be updated - obj := kubernetes.GenericObject{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pipeline", - APIVersion: "numaflow.numaproj.io/v1alpha1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: pipelineRollout.Name, - Namespace: pipelineRollout.Namespace, - OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pipelineRollout.GetObjectMeta(), apiv1.PipelineRolloutGroupVersionKind)}, - }, - Spec: pipelineRollout.Spec.Pipeline, + // make a Pipeline object and add/update spec hash on the PipelineRollout object + obj, rolloutChildOp, err := makeChildResourceFromRolloutAndUpdateSpecHash(ctx, r.restConfig, pipelineRollout) + if err != nil { + numaLogger.Errorf(err, "failed to make a Pipeline object and to update the PipelineRollout: %v", err) + return false, err } - // Get the object to see if it exists - _, err := kubernetes.GetResource(ctx, r.restConfig, &obj, "pipelines") - if err != nil { - // create object as it doesn't exist - if apierrors.IsNotFound(err) { - err = kubernetes.CreateCR(ctx, r.restConfig, &obj, "pipelines") - if err != nil { - return false, err - } + if rolloutChildOp == RolloutChildNew { + err = kubernetes.CreateCR(ctx, r.restConfig, obj, "pipelines") + if err != nil { + return false, err } - } else { + } else if rolloutChildOp == RolloutChildUpdate { // If the pipeline already exists, first check if the pipeline status // is pausing. If so, re-enqueue immediately. - pipeline, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "pipelines") + pipeline, err := kubernetes.GetCR(ctx, r.restConfig, obj, "pipelines") if err != nil { numaLogger.Errorf(err, "failed to get Pipeline: %v", err) return false, err @@ -201,13 +187,13 @@ func (r *PipelineRolloutReconciler) reconcile( // Apply the new spec and resume the pipeline // TODO: in the future, need to take into account whether Numaflow Controller // or ISBService is being installed to determine whether it's safe to unpause - newObj, err := setPipelineDesiredStatus(&obj, "Running") + newObj, err := setPipelineDesiredStatus(obj, "Running") if err != nil { return false, err } - obj = *newObj + obj = newObj - err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout) + err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout) if err != nil { return false, err } @@ -216,20 +202,20 @@ func (r *PipelineRolloutReconciler) reconcile( } // If pipeline status is not above, detect if pausing is required. - shouldPause, err := needsPausing(pipeline, &obj) + shouldPause, err := needsPausing(pipeline, obj) if err != nil { return false, err } if shouldPause { // Use the existing spec, then pause and re-enqueue obj.Spec = pipeline.Spec - newObj, err := setPipelineDesiredStatus(&obj, "Paused") + newObj, err := setPipelineDesiredStatus(obj, "Paused") if err != nil { return false, err } - obj = *newObj + obj = newObj - err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout) + err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout) if err != nil { return false, err } @@ -237,10 +223,12 @@ func (r *PipelineRolloutReconciler) reconcile( } // If no need to pause, just apply the spec - err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout) + err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout) if err != nil { return false, err } + } else { + numaLogger.Debug("Pipeline spec is unchanged. No updates will be performed") } pipelineRollout.Status.MarkRunning() @@ -277,7 +265,9 @@ func (r *PipelineRolloutReconciler) needsUpdate(old, new *apiv1.PipelineRollout) // check for any fields we might update in the Spec - generally we'd only update a Finalizer or maybe something in the metadata // TODO: we would need to update this if we ever add anything else, like a label or annotation - unless there's a generic check that makes sense - if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) { + // Checking only the Finalizers and Annotations allows to have more control on updating only when certain changes have been made. + // However, do we want to be also more specific? For instance, check specific finalizers and annotations (ex: .DeepEqual(old.Annotations["somekey"], new.Annotations["somekey"])) + if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) || !equality.Semantic.DeepEqual(old.Annotations, new.Annotations) { return true } return false diff --git a/internal/controller/pipelinerollout_controller_test.go b/internal/controller/pipelinerollout_controller_test.go index 1ce2256f..31b61905 100644 --- a/internal/controller/pipelinerollout_controller_test.go +++ b/internal/controller/pipelinerollout_controller_test.go @@ -19,17 +19,22 @@ package controller import ( "context" "encoding/json" + "testing" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/rest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaplane/internal/util" + "github.com/numaproj/numaplane/internal/util/kubernetes" apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" ) @@ -122,6 +127,19 @@ var _ = Describe("PipelineRollout Controller", func() { By("Verifying the content of the pipeline spec field") Expect(createdPipelineRolloutPipelineSpec).Should(Equal(pipelineSpec)) + + By("Verifying the spec hash stored in the PipelineRollout annotations after creation") + var pipelineSpecAsMap map[string]any + Expect(json.Unmarshal(pipelineSpecRaw, &pipelineSpecAsMap)).ToNot(HaveOccurred()) + pipelineSpecHash := util.MustHash(pipelineSpecAsMap) + Eventually(func() (string, error) { + createdResource := &apiv1.PipelineRollout{} + err := k8sClient.Get(ctx, resourceLookupKey, createdResource) + if err != nil { + return "", err + } + return createdResource.Annotations[apiv1.KeyHash], nil + }, timeout, interval).Should(Equal(pipelineSpecHash)) }) It("Should create a Numaflow Pipeline", func() { @@ -152,6 +170,24 @@ var _ = Describe("PipelineRollout Controller", func() { currentPipelineRollout := &apiv1.PipelineRollout{} Expect(k8sClient.Get(ctx, resourceLookupKey, currentPipelineRollout)).ToNot(HaveOccurred()) + var lastTransitionTime time.Time + Eventually(func() (time.Time, error) { + currentResource := &apiv1.PipelineRollout{} + err := k8sClient.Get(ctx, resourceLookupKey, currentResource) + if err != nil { + return time.Time{}, err + } + + for _, cond := range currentPipelineRollout.Status.Conditions { + if cond.Type == string(apiv1.ConditionConfigured) { + lastTransitionTime = cond.LastTransitionTime.Time + return lastTransitionTime, nil + } + } + + return time.Time{}, nil + }, timeout, interval).Should(Not(Equal(time.Time{}))) + pipelineSpec.InterStepBufferServiceName = "my-isbsvc-updated" pipelineSpecRaw, err := json.Marshal(pipelineSpec) Expect(err).ToNot(HaveOccurred()) @@ -183,6 +219,60 @@ var _ = Describe("PipelineRollout Controller", func() { } return updatedChildResource.Spec, nil }, timeout, interval).Should(Equal(pipelineSpec)) + + By("Verifying the spec hash stored in the PipelineRollout annotations after update") + var pipelineSpecAsMap map[string]any + Expect(json.Unmarshal(pipelineSpecRaw, &pipelineSpecAsMap)).ToNot(HaveOccurred()) + pipelineSpecHash := util.MustHash(pipelineSpecAsMap) + Eventually(func() (string, error) { + updatedResource := &apiv1.PipelineRollout{} + err := k8sClient.Get(ctx, resourceLookupKey, updatedResource) + if err != nil { + return "", err + } + return updatedResource.Annotations[apiv1.KeyHash], nil + }, timeout, interval).Should(Equal(pipelineSpecHash)) + + By("Verifying the LastTransitionTime of the Configured condition of the PipelineRollout is after the time of the initial configuration") + Eventually(func() (bool, error) { + updatedResource := &apiv1.PipelineRollout{} + err := k8sClient.Get(ctx, resourceLookupKey, updatedResource) + if err != nil { + return false, err + } + + for _, cond := range updatedResource.Status.Conditions { + if cond.Type == string(apiv1.ConditionConfigured) { + isAfter := cond.LastTransitionTime.Time.After(lastTransitionTime) + lastTransitionTime = cond.LastTransitionTime.Time + return isAfter, nil + } + } + + return false, nil + }, time.Second, interval).Should(BeTrue()) + + By("Verifying that the same PipelineRollout should not perform and update (no Configuration condition LastTransitionTime change) and the hash spec annotation should not change") + Expect(k8sClient.Get(ctx, resourceLookupKey, currentPipelineRollout)).ToNot(HaveOccurred()) + Expect(k8sClient.Update(ctx, currentPipelineRollout)).ToNot(HaveOccurred()) + Eventually(func() (bool, error) { + updatedResource := &apiv1.PipelineRollout{} + err := k8sClient.Get(ctx, resourceLookupKey, updatedResource) + if err != nil { + return false, err + } + + equalHash := updatedResource.Annotations[apiv1.KeyHash] == pipelineSpecHash + + for _, cond := range updatedResource.Status.Conditions { + if cond.Type == string(apiv1.ConditionConfigured) { + equalTime := cond.LastTransitionTime.Time.Equal(lastTransitionTime) + return equalTime && equalHash, nil + } + } + + return false, nil + }, timeout, interval).Should(BeTrue()) }) It("Should delete the PipelineRollout and Numaflow Pipeline", func() { @@ -221,16 +311,89 @@ var _ = Describe("PipelineRollout Controller", func() { It("Should not create the PipelineRollout", func() { Expect(k8sClient.Create(ctx, &apiv1.PipelineRollout{ Spec: pipelineRollout.Spec, - })).To(HaveOccurred()) + })).ShouldNot(Succeed()) Expect(k8sClient.Create(ctx, &apiv1.PipelineRollout{ ObjectMeta: pipelineRollout.ObjectMeta, - })).To(HaveOccurred()) + })).ShouldNot(Succeed()) Expect(k8sClient.Create(ctx, &apiv1.PipelineRollout{ ObjectMeta: pipelineRollout.ObjectMeta, Spec: apiv1.PipelineRolloutSpec{}, - })).To(HaveOccurred()) + })).ShouldNot(Succeed()) }) }) }) + +func Test_makeChildResourceFromRolloutAndUpdateSpecHash_InvalidType(t *testing.T) { + ctx := context.Background() + restConfig := &rest.Config{} + + invalidType := kubernetes.GenericObject{} + + _, _, err := makeChildResourceFromRolloutAndUpdateSpecHash(ctx, restConfig, &invalidType) + + assert.Error(t, err) + assert.Equal(t, "invalid rollout type", err.Error()) +} + +func Test_makeChildResourceFromRolloutAndUpdateSpecHash_PipelineRollout_UnmarshalError(t *testing.T) { + ctx := context.Background() + restConfig := &rest.Config{} + + pipelineRolloutInvalidSpec := &apiv1.PipelineRollout{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pipeline-test", + Namespace: "default", + }, + Spec: apiv1.PipelineRolloutSpec{ + // providing invalid JSON for unmarshal error + Pipeline: runtime.RawExtension{Raw: []byte(`{"key":"value"`)}, + }, + } + + _, _, err := makeChildResourceFromRolloutAndUpdateSpecHash(ctx, restConfig, pipelineRolloutInvalidSpec) + assert.Error(t, err) +} + +func Test_setAnnotation(t *testing.T) { + key1, value1 := "some_key_1", "some_value_1" + key2, value2 := "some_key_2", "some_value_2" + + pipelineRollout := &apiv1.PipelineRollout{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pipeline-test", + Namespace: "default", + }, + Spec: apiv1.PipelineRolloutSpec{ + Pipeline: runtime.RawExtension{Raw: []byte(`{"key":"value"}`)}, + }, + } + + // Invoke the method under test + setAnnotation(pipelineRollout, key1, value1) + + // Check if the annotation was correctly set + annotations := pipelineRollout.GetAnnotations() + assert.NotNil(t, annotations, "Expected annotations to be set on the pipelineRollout object") + assert.Contains(t, annotations, key1, "Expected the key to be set in the annotations") + assert.Equal(t, value1, annotations[key1], "Expected the value to be set for the key in the annotations") + + // Overwrite existing annotation + newValue := "new_value" + setAnnotation(pipelineRollout, key1, newValue) + + // Check if the annotation was correctly updated + annotations = pipelineRollout.GetAnnotations() + assert.NotNil(t, annotations, "Expected annotations to be set on the pipelineRollout object") + assert.Contains(t, annotations, key1, "Expected the key to be set in the annotations") + assert.NotEqual(t, value1, annotations[key1], "Expected the old value to be replaced") + assert.Equal(t, newValue, annotations[key1], "Expected the new value to be set for the key in the annotations") + + // Add one more annotation + setAnnotation(pipelineRollout, key2, value2) + assert.NotNil(t, annotations, "Expected annotations to be set on the pipelineRollout object") + assert.Len(t, annotations, 2, "Expected annotations to be of length 2") + assert.Contains(t, annotations, key2, "Expected the key to be set in the annotations") + assert.Equal(t, value2, annotations[key2], "Expected the value to be set for the key in the annotations") +} diff --git a/internal/controller/shared.go b/internal/controller/shared.go new file mode 100644 index 00000000..5eebfc4a --- /dev/null +++ b/internal/controller/shared.go @@ -0,0 +1,126 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/numaproj/numaplane/internal/util" + "github.com/numaproj/numaplane/internal/util/kubernetes" + apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" +) + +type RolloutChildOperation string + +const ( + // The child resource does not exist on the cluster and needs to be created + RolloutChildNew RolloutChildOperation = "CREATE_CHILD_RESOURCE" + // The child resource exists on the cluster but needs to be update + RolloutChildUpdate RolloutChildOperation = "UPDATE_CHILD_RESOURCE" + // The child resource exists on the cluster and does not need to be update + RolloutChildNoop RolloutChildOperation = "DO_NOTHING" +) + +// makeChildResourceFromRolloutAndUpdateSpecHash makes a new kubernetes.GenericObject based on the given rolloutObj. +// It returns the child resource object ready to be created and an operation to be performed with the returned object. +// The operations are defined by the RolloutChildOperation constants. +func makeChildResourceFromRolloutAndUpdateSpecHash( + ctx context.Context, + restConfig *rest.Config, + rolloutObj metav1.Object, +) (*kubernetes.GenericObject, RolloutChildOperation, error) { + kind := "" + pluralName := "" + var groupVersionKind schema.GroupVersionKind + var childResourceSpec runtime.RawExtension + + // TODO: LOW PRIORITY: alternatively, consider passing kind, pluralName, groupVersionKind, and childResourceSpec as arguments + switch ro := rolloutObj.(type) { + case *apiv1.PipelineRollout: + kind = "Pipeline" + pluralName = "pipelines" + groupVersionKind = apiv1.PipelineRolloutGroupVersionKind + childResourceSpec = ro.Spec.Pipeline + case *apiv1.ISBServiceRollout: + kind = "InterStepBufferService" + pluralName = "interstepbufferservices" + groupVersionKind = apiv1.ISBServiceRolloutGroupVersionKind + childResourceSpec = ro.Spec.InterStepBufferService + default: + return nil, RolloutChildNoop, errors.New("invalid rollout type") + } + + obj := kubernetes.GenericObject{ + TypeMeta: metav1.TypeMeta{ + Kind: kind, + APIVersion: "numaflow.numaproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: rolloutObj.GetName(), + Namespace: rolloutObj.GetNamespace(), + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(rolloutObj, groupVersionKind)}, + }, + } + + var childResourceSpecAsMap map[string]any + err := json.Unmarshal(childResourceSpec.Raw, &childResourceSpecAsMap) + if err != nil { + return nil, RolloutChildNoop, fmt.Errorf("unable to unmarshal %s spec to map: %v", kind, err) + } + childResouceSpecHash := util.MustHash(childResourceSpecAsMap) + + rolloutChildOp := RolloutChildNoop + _, err = kubernetes.GetCR(ctx, restConfig, &obj, pluralName) + if err != nil { + if apierrors.IsNotFound(err) { + rolloutChildOp = RolloutChildNew + } else { + return nil, RolloutChildNoop, fmt.Errorf("unable to get %s %s/%s: %v", kind, obj.Namespace, obj.Name, err) + } + } + + if rolloutChildOp == RolloutChildNoop { + annotations := rolloutObj.GetAnnotations() + if annotation, exists := annotations[apiv1.KeyHash]; exists && annotation != childResouceSpecHash { + rolloutChildOp = RolloutChildUpdate + } + } + + setAnnotation(rolloutObj, apiv1.KeyHash, childResouceSpecHash) + obj.Spec = childResourceSpec + + return &obj, rolloutChildOp, nil +} + +func setAnnotation(obj metav1.Object, key, value string) { + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + + annotations[key] = value + + obj.SetAnnotations(annotations) +} diff --git a/internal/kubernetes/dynamic_client.go b/internal/kubernetes/dynamic_client.go deleted file mode 100644 index 2720a9b2..00000000 --- a/internal/kubernetes/dynamic_client.go +++ /dev/null @@ -1,221 +0,0 @@ -package kubernetes - -import ( - "context" - "encoding/json" - "fmt" - "strings" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - - "k8s.io/client-go/dynamic" - "k8s.io/client-go/rest" - - "github.com/numaproj/numaplane/internal/util/logger" -) - -//todo: add unit tests - -func parseApiVersion(apiVersion string) (string, string, error) { - // should be separated by slash - index := strings.Index(apiVersion, "/") - if index == -1 { - // if there's no slash, it's just the version, and the group should be "core" - return "core", apiVersion, nil - } else if index == len(apiVersion)-1 { - return "", "", fmt.Errorf("apiVersion incorrectly formatted: unexpected slash at end: %q", apiVersion) - } - return apiVersion[0:index], apiVersion[index+1:], nil -} - -type GenericObject struct { - metav1.TypeMeta `json:",inline"` - metav1.ObjectMeta `json:"metadata,omitempty"` - - Spec runtime.RawExtension `json:"spec"` - Status runtime.RawExtension `json:"status,omitempty"` -} - -type GenericStatus struct { - Phase string `json:"phase,omitempty"` - Conditions []metav1.Condition `json:"conditions,omitempty"` -} - -func ParseStatus(obj *GenericObject) (GenericStatus, error) { - if len(obj.Status.Raw) == 0 { - return GenericStatus{}, nil - } - - var status GenericStatus - err := json.Unmarshal(obj.Status.Raw, &status) - if err != nil { - return GenericStatus{}, err - } - return status, nil -} - -func GetResource( - ctx context.Context, - restConfig *rest.Config, - object *GenericObject, - pluralName string, -) (*unstructured.Unstructured, error) { - client, err := dynamic.NewForConfig(restConfig) - if err != nil { - return nil, fmt.Errorf("failed to create dynamic client: %v", err) - } - - group, version, err := parseApiVersion(object.APIVersion) - if err != nil { - return nil, err - } - - gvr := schema.GroupVersionResource{ - Group: group, - Version: version, - Resource: pluralName, - } - - return client.Resource(gvr).Namespace(object.Namespace).Get(ctx, object.Name, v1.GetOptions{}) -} - -// ApplyCRSpec either creates or updates an object identified by the RawExtension, using the new definition, -// first checking to see if there's a difference in Spec before applying -// TODO: use CreateCR and UpdateCR instead -func ApplyCRSpec(ctx context.Context, restConfig *rest.Config, object *GenericObject, pluralName string) error { - numaLogger := logger.FromContext(ctx) - - client, err := dynamic.NewForConfig(restConfig) - if err != nil { - return fmt.Errorf("failed to create dynamic client: %v", err) - } - - group, version, err := parseApiVersion(object.APIVersion) - if err != nil { - return err - } - - gvr := schema.GroupVersionResource{ - Group: group, - Version: version, - Resource: pluralName, - } - - // Get the object to see if it exists - resource, err := client.Resource(gvr).Namespace(object.Namespace).Get(ctx, object.Name, v1.GetOptions{}) - - if err != nil { - if apierrors.IsNotFound(err) { - // create object as it doesn't exist - numaLogger.Debugf("didn't find resource %s/%s, will create", object.Namespace, object.Name) - - unstruct, err := ObjectToUnstructured(object) - if err != nil { - return err - } - - _, err = client.Resource(gvr).Namespace(object.Namespace).Create(ctx, unstruct, v1.CreateOptions{}) - if err != nil { - return fmt.Errorf("failed to create Resource %s/%s, err=%v", object.Namespace, object.Name, err) - } - numaLogger.Debugf("successfully created resource %s/%s", object.Namespace, object.Name) - } else { - return fmt.Errorf("error attempting to Get resources; GVR=%+v err: %v", gvr, err) - } - - } else { - numaLogger.Debugf("found existing Resource definition for %s/%s: %+v", object.Namespace, object.Name, resource) - // todo: - // If the existing annotation matches the new hash, then nothing to do: log and return - - // replace the Object's Spec - resource.Object["spec"] = object.Spec - - _, err = client.Resource(gvr).Namespace(object.Namespace).Update(ctx, resource, v1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("failed to update Resource %s/%s, err=%v", object.Namespace, object.Name, err) - } - numaLogger.Debugf("successfully updated resource %s/%s", object.Namespace, object.Name) - - } - return nil -} - -// look up a Resource -func GetCR(ctx context.Context, restConfig *rest.Config, object *GenericObject, pluralName string) (*GenericObject, error) { - unstruc, err := GetResource(ctx, restConfig, object, pluralName) - if unstruc != nil { - return UnstructuredToObject(unstruc) - } else { - return nil, err - } -} - -func CreateCR( - ctx context.Context, - restConfig *rest.Config, - object *GenericObject, - pluralName string, -) error { - numaLogger := logger.FromContext(ctx) - - client, err := dynamic.NewForConfig(restConfig) - if err != nil { - return fmt.Errorf("failed to create dynamic client: %v", err) - } - - group, version, err := parseApiVersion(object.APIVersion) - if err != nil { - return err - } - - gvr := schema.GroupVersionResource{ - Group: group, - Version: version, - Resource: pluralName, - } - - numaLogger.Debugf("didn't find resource %s/%s, will create", object.Namespace, object.Name) - - unstruct, err := ObjectToUnstructured(object) - if err != nil { - return err - } - - _, err = client.Resource(gvr).Namespace(object.Namespace).Create(ctx, unstruct, v1.CreateOptions{}) - if err != nil { - return fmt.Errorf("failed to create Resource %s/%s, err=%v", object.Namespace, object.Name, err) - } - numaLogger.Debugf("successfully created resource %s/%s", object.Namespace, object.Name) - return nil -} - -func ObjectToUnstructured(object *GenericObject) (*unstructured.Unstructured, error) { - asJsonBytes, err := json.Marshal(object) - if err != nil { - return nil, err - } - var asMap map[string]interface{} - err = json.Unmarshal(asJsonBytes, &asMap) - if err != nil { - return nil, err - } - - return &unstructured.Unstructured{Object: asMap}, nil -} - -func UnstructuredToObject(u *unstructured.Unstructured) (*GenericObject, error) { - asJsonBytes, err := json.Marshal(u.Object) - if err != nil { - return nil, err - } - var genericObject GenericObject - err = json.Unmarshal(asJsonBytes, &genericObject) - - return &genericObject, err -} diff --git a/internal/util/kubernetes/kubernetes.go b/internal/util/kubernetes/kubernetes.go index 3b8855c1..15ebac5c 100644 --- a/internal/util/kubernetes/kubernetes.go +++ b/internal/util/kubernetes/kubernetes.go @@ -2,6 +2,7 @@ package kubernetes import ( "context" + "encoding/json" "fmt" "regexp" "strings" @@ -9,6 +10,7 @@ import ( "github.com/argoproj/gitops-engine/pkg/utils/kube" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" yamlserializer "k8s.io/apimachinery/pkg/runtime/serializer/yaml" "k8s.io/apimachinery/pkg/util/validation" @@ -17,11 +19,30 @@ import ( "github.com/numaproj/numaplane/internal/util/logger" "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" ) // validManifestExtensions contains the supported extension for raw file. var validManifestExtensions = map[string]struct{}{"yaml": {}, "yml": {}, "json": {}} +type GenericObject struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec runtime.RawExtension `json:"spec"` + Status runtime.RawExtension `json:"status,omitempty"` +} + +type GenericStatus struct { + Phase string `json:"phase,omitempty"` + Conditions []metav1.Condition `json:"conditions,omitempty"` +} + func IsValidKubernetesNamespace(name string) bool { // All namespace names must be valid RFC 1123 DNS labels. errs := validation.IsDNS1123Label(name) @@ -185,3 +206,189 @@ func ApplyOwnership(manifest string, controllerRollout *v1alpha1.NumaflowControl } return modifiedManifest, nil } + +func parseApiVersion(apiVersion string) (string, string, error) { + // should be separated by slash + index := strings.Index(apiVersion, "/") + if index == -1 { + // if there's no slash, it's just the version, and the group should be "core" + return "core", apiVersion, nil + } else if index == len(apiVersion)-1 { + return "", "", fmt.Errorf("apiVersion incorrectly formatted: unexpected slash at end: %q", apiVersion) + } + return apiVersion[0:index], apiVersion[index+1:], nil +} + +func ParseStatus(obj *GenericObject) (GenericStatus, error) { + if len(obj.Status.Raw) == 0 { + return GenericStatus{}, nil + } + + var status GenericStatus + err := json.Unmarshal(obj.Status.Raw, &status) + if err != nil { + return GenericStatus{}, err + } + return status, nil +} + +func GetResource( + ctx context.Context, + restConfig *rest.Config, + object *GenericObject, + pluralName string, +) (*unstructured.Unstructured, error) { + client, err := dynamic.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to create dynamic client: %v", err) + } + + group, version, err := parseApiVersion(object.APIVersion) + if err != nil { + return nil, err + } + + gvr := schema.GroupVersionResource{ + Group: group, + Version: version, + Resource: pluralName, + } + + return client.Resource(gvr).Namespace(object.Namespace).Get(ctx, object.Name, metav1.GetOptions{}) +} + +// ApplyCRSpec either creates or updates an object identified by the RawExtension, using the new definition, +// first checking to see if there's a difference in Spec before applying +// TODO: use CreateCR and UpdateCR instead +func ApplyCRSpec(ctx context.Context, restConfig *rest.Config, object *GenericObject, pluralName string) error { + numaLogger := logger.FromContext(ctx) + + client, err := dynamic.NewForConfig(restConfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %v", err) + } + + group, version, err := parseApiVersion(object.APIVersion) + if err != nil { + return err + } + + gvr := schema.GroupVersionResource{ + Group: group, + Version: version, + Resource: pluralName, + } + + // Get the object to see if it exists + resource, err := client.Resource(gvr).Namespace(object.Namespace).Get(ctx, object.Name, metav1.GetOptions{}) + + if err != nil { + if apierrors.IsNotFound(err) { + // create object as it doesn't exist + numaLogger.Debugf("didn't find resource %s/%s, will create", object.Namespace, object.Name) + + unstruct, err := ObjectToUnstructured(object) + if err != nil { + return err + } + + _, err = client.Resource(gvr).Namespace(object.Namespace).Create(ctx, unstruct, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create Resource %s/%s, err=%v", object.Namespace, object.Name, err) + } + numaLogger.Debugf("successfully created resource %s/%s", object.Namespace, object.Name) + } else { + return fmt.Errorf("error attempting to Get resources; GVR=%+v err: %v", gvr, err) + } + + } else { + numaLogger.Debugf("found existing Resource definition for %s/%s: %+v", object.Namespace, object.Name, resource) + // todo: + // If the existing annotation matches the new hash, then nothing to do: log and return + + // replace the Object's Spec + resource.Object["spec"] = object.Spec + + _, err = client.Resource(gvr).Namespace(object.Namespace).Update(ctx, resource, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update Resource %s/%s, err=%v", object.Namespace, object.Name, err) + } + numaLogger.Debugf("successfully updated resource %s/%s", object.Namespace, object.Name) + + } + return nil +} + +// look up a Resource +func GetCR(ctx context.Context, restConfig *rest.Config, object *GenericObject, pluralName string) (*GenericObject, error) { + unstruc, err := GetResource(ctx, restConfig, object, pluralName) + if unstruc != nil { + return UnstructuredToObject(unstruc) + } else { + return nil, err + } +} + +func CreateCR( + ctx context.Context, + restConfig *rest.Config, + object *GenericObject, + pluralName string, +) error { + numaLogger := logger.FromContext(ctx) + + client, err := dynamic.NewForConfig(restConfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %v", err) + } + + group, version, err := parseApiVersion(object.APIVersion) + if err != nil { + return err + } + + gvr := schema.GroupVersionResource{ + Group: group, + Version: version, + Resource: pluralName, + } + + numaLogger.Debugf("didn't find resource %s/%s, will create", object.Namespace, object.Name) + + unstruct, err := ObjectToUnstructured(object) + if err != nil { + return err + } + + _, err = client.Resource(gvr).Namespace(object.Namespace).Create(ctx, unstruct, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create Resource %s/%s, err=%v", object.Namespace, object.Name, err) + } + numaLogger.Debugf("successfully created resource %s/%s", object.Namespace, object.Name) + return nil +} + +func ObjectToUnstructured(object *GenericObject) (*unstructured.Unstructured, error) { + asJsonBytes, err := json.Marshal(object) + if err != nil { + return nil, err + } + var asMap map[string]interface{} + err = json.Unmarshal(asJsonBytes, &asMap) + if err != nil { + return nil, err + } + + return &unstructured.Unstructured{Object: asMap}, nil +} + +func UnstructuredToObject(u *unstructured.Unstructured) (*GenericObject, error) { + asJsonBytes, err := json.Marshal(u.Object) + if err != nil { + return nil, err + } + var genericObject GenericObject + err = json.Unmarshal(asJsonBytes, &genericObject) + + return &genericObject, err +} diff --git a/internal/util/util.go b/internal/util/util.go new file mode 100644 index 00000000..03820c7b --- /dev/null +++ b/internal/util/util.go @@ -0,0 +1,49 @@ +/* +Copyright 2024 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" +) + +// MustHash returns a sha256 encoded string based on the given argument. +func MustHash(v any) string { + switch data := v.(type) { + case []byte: + hash := sha256.New() + if _, err := hash.Write(data); err != nil { + panic(err) + } + return hex.EncodeToString(hash.Sum(nil)) + case string: + return MustHash([]byte(data)) + default: + return MustHash([]byte(MustJSON(v))) + } +} + +// MustJSON makes sure the in argument is a valid JSON struct +// and returns its marshalled string version. +func MustJSON(in any) string { + if data, err := json.Marshal(in); err != nil { + panic(err) + } else { + return string(data) + } +} diff --git a/pkg/apis/numaplane/v1alpha1/const.go b/pkg/apis/numaplane/v1alpha1/const.go new file mode 100644 index 00000000..98d45b0b --- /dev/null +++ b/pkg/apis/numaplane/v1alpha1/const.go @@ -0,0 +1,21 @@ +/* +Copyright 2024 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +const ( + KeyHash = "numaplane.numaproj.io/hash" +)