Skip to content

Commit

Permalink
Spec hash annotation (#52)
Browse files Browse the repository at this point in the history
Signed-off-by: Antonino Fugazzotto <[email protected]>
  • Loading branch information
afugazzotto authored Jun 12, 2024
1 parent 32a16d8 commit c708d3a
Show file tree
Hide file tree
Showing 10 changed files with 619 additions and 282 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ bin/*
go.work

config/crd/bases/_.yaml
.idea
.idea

config/crd/external/
38 changes: 19 additions & 19 deletions internal/controller/isbservicerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -33,7 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/kubernetes"
"github.com/numaproj/numaplane/internal/util/kubernetes"
"github.com/numaproj/numaplane/internal/util/logger"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)
Expand Down Expand Up @@ -77,7 +76,7 @@ func NewISBServiceRolloutReconciler(
func (r *ISBServiceRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// update the Base Logger's level according to the Numaplane Config
logger.RefreshBaseLoggerLevel()
numaLogger := logger.GetBaseLogger().WithName("reconciler").WithValues("isbservicerollout", req.NamespacedName)
numaLogger := logger.GetBaseLogger().WithName("isbservicerollout-reconciler").WithValues("isbservicerollout", req.NamespacedName)

isbServiceRollout := &apiv1.ISBServiceRollout{}
if err := r.client.Get(ctx, req.NamespacedName, isbServiceRollout); err != nil {
Expand Down Expand Up @@ -143,29 +142,28 @@ func (r *ISBServiceRolloutReconciler) reconcile(ctx context.Context, isbServiceR
controllerutil.AddFinalizer(isbServiceRollout, finalizerName)
}

// apply ISBService
// todo: store hash of spec in annotation; use to compare to determine if anything needs to be updated
obj := kubernetes.GenericObject{
TypeMeta: metav1.TypeMeta{
Kind: "InterStepBufferService",
APIVersion: "numaflow.numaproj.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: isbServiceRollout.Name,
Namespace: isbServiceRollout.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(isbServiceRollout.GetObjectMeta(), apiv1.ISBServiceRolloutGroupVersionKind)},
},
Spec: isbServiceRollout.Spec.InterStepBufferService,
// make an InterStepBufferService object and add/update spec hash on the ISBServiceRollout object
obj, rolloutChildOp, err := makeChildResourceFromRolloutAndUpdateSpecHash(ctx, r.restConfig, isbServiceRollout)
if err != nil {
numaLogger.Errorf(err, "failed to make an InterStepBufferService object and to update the ISBServiceRollout: %v", err)
return err
}

// TODO: instead of doing this, modify the ApplyCRSpec below to be similar to what is done on the PipelineRollout controller code
if rolloutChildOp == RolloutChildNoop {
numaLogger.Debug("InterStepBufferService spec is unchanged. No updates will be performed")
return nil
}

err := kubernetes.ApplyCRSpec(ctx, r.restConfig, &obj, "interstepbufferservices")
err = kubernetes.ApplyCRSpec(ctx, r.restConfig, obj, "interstepbufferservices")
if err != nil {
numaLogger.Errorf(err, "failed to apply CR: %v", err)
isbServiceRollout.Status.MarkFailed("ApplyISBServiceFailure", err.Error())
return err
}

// after the Apply, Get the ISBService so that we can propagate its health into our Status
isbsvc, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "interstepbufferservices")
isbsvc, err := kubernetes.GetCR(ctx, r.restConfig, obj, "interstepbufferservices")
if err != nil {
numaLogger.Errorf(err, "failed to get ISBServices: %v", err)
return err
Expand Down Expand Up @@ -206,7 +204,9 @@ func (r *ISBServiceRolloutReconciler) needsUpdate(old, new *apiv1.ISBServiceRoll

// check for any fields we might update in the Spec - generally we'd only update a Finalizer or maybe something in the metadata
// TODO: we would need to update this if we ever add anything else, like a label or annotation - unless there's a generic check that makes sense
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) {
// Checking only the Finalizers and Annotations allows to have more control on updating only when certain changes have been made.
// However, do we want to be also more specific? For instance, check specific finalizers and annotations (ex: .DeepEqual(old.Annotations["somekey"], new.Annotations["somekey"]))
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) || !equality.Semantic.DeepEqual(old.Annotations, new.Annotations) {
return true
}
return false
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/numaflowcontrollerrollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
)

const (
finalizerName = "numaplane-controller"
finalizerName = "numaplane.numaproj.io/numaplane-controller"
)

const (
Expand Down Expand Up @@ -120,7 +120,7 @@ func loadDefinitions() (map[string]string, error) {
func (r *NumaflowControllerRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// update the Base Logger's level according to the Numaplane Config
logger.RefreshBaseLoggerLevel()
numaLogger := logger.GetBaseLogger().WithName("reconciler").WithValues("numaflowcontrollerrollout", req.NamespacedName)
numaLogger := logger.GetBaseLogger().WithName("numaflowcontrollerrollout-reconciler").WithValues("numaflowcontrollerrollout", req.NamespacedName)

// TODO: only allow one controllerRollout per namespace.
numaflowControllerRollout := &apiv1.NumaflowControllerRollout{}
Expand Down
62 changes: 26 additions & 36 deletions internal/controller/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
Expand All @@ -35,7 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/kubernetes"
"github.com/numaproj/numaplane/internal/util/kubernetes"
"github.com/numaproj/numaplane/internal/util/logger"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)
Expand Down Expand Up @@ -79,7 +78,7 @@ func NewPipelineRolloutReconciler(
func (r *PipelineRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// update the Base Logger's level according to the Numaplane Config
logger.RefreshBaseLoggerLevel()
numaLogger := logger.GetBaseLogger().WithName("reconciler").WithValues("pipelinerollout", req.NamespacedName)
numaLogger := logger.GetBaseLogger().WithName("pipelinerollout-reconciler").WithValues("pipelinerollout", req.NamespacedName)
// update the context with this Logger so downstream users can incorporate these values in the logs
ctx = logger.WithLogger(ctx, numaLogger)

Expand Down Expand Up @@ -155,35 +154,22 @@ func (r *PipelineRolloutReconciler) reconcile(
controllerutil.AddFinalizer(pipelineRollout, finalizerName)
}

// apply Pipeline
// todo: store hash of spec in annotation; use to compare to determine if anything needs to be updated
obj := kubernetes.GenericObject{
TypeMeta: metav1.TypeMeta{
Kind: "Pipeline",
APIVersion: "numaflow.numaproj.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: pipelineRollout.Name,
Namespace: pipelineRollout.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pipelineRollout.GetObjectMeta(), apiv1.PipelineRolloutGroupVersionKind)},
},
Spec: pipelineRollout.Spec.Pipeline,
// make a Pipeline object and add/update spec hash on the PipelineRollout object
obj, rolloutChildOp, err := makeChildResourceFromRolloutAndUpdateSpecHash(ctx, r.restConfig, pipelineRollout)
if err != nil {
numaLogger.Errorf(err, "failed to make a Pipeline object and to update the PipelineRollout: %v", err)
return false, err
}

// Get the object to see if it exists
_, err := kubernetes.GetResource(ctx, r.restConfig, &obj, "pipelines")
if err != nil {
// create object as it doesn't exist
if apierrors.IsNotFound(err) {
err = kubernetes.CreateCR(ctx, r.restConfig, &obj, "pipelines")
if err != nil {
return false, err
}
if rolloutChildOp == RolloutChildNew {
err = kubernetes.CreateCR(ctx, r.restConfig, obj, "pipelines")
if err != nil {
return false, err
}
} else {
} else if rolloutChildOp == RolloutChildUpdate {
// If the pipeline already exists, first check if the pipeline status
// is pausing. If so, re-enqueue immediately.
pipeline, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "pipelines")
pipeline, err := kubernetes.GetCR(ctx, r.restConfig, obj, "pipelines")
if err != nil {
numaLogger.Errorf(err, "failed to get Pipeline: %v", err)
return false, err
Expand All @@ -201,13 +187,13 @@ func (r *PipelineRolloutReconciler) reconcile(
// Apply the new spec and resume the pipeline
// TODO: in the future, need to take into account whether Numaflow Controller
// or ISBService is being installed to determine whether it's safe to unpause
newObj, err := setPipelineDesiredStatus(&obj, "Running")
newObj, err := setPipelineDesiredStatus(obj, "Running")
if err != nil {
return false, err
}
obj = *newObj
obj = newObj

err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout)
err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout)
if err != nil {
return false, err
}
Expand All @@ -216,31 +202,33 @@ func (r *PipelineRolloutReconciler) reconcile(
}

// If pipeline status is not above, detect if pausing is required.
shouldPause, err := needsPausing(pipeline, &obj)
shouldPause, err := needsPausing(pipeline, obj)
if err != nil {
return false, err
}
if shouldPause {
// Use the existing spec, then pause and re-enqueue
obj.Spec = pipeline.Spec
newObj, err := setPipelineDesiredStatus(&obj, "Paused")
newObj, err := setPipelineDesiredStatus(obj, "Paused")
if err != nil {
return false, err
}
obj = *newObj
obj = newObj

err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout)
err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout)
if err != nil {
return false, err
}
return true, err
}

// If no need to pause, just apply the spec
err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout)
err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout)
if err != nil {
return false, err
}
} else {
numaLogger.Debug("Pipeline spec is unchanged. No updates will be performed")
}

pipelineRollout.Status.MarkRunning()
Expand Down Expand Up @@ -277,7 +265,9 @@ func (r *PipelineRolloutReconciler) needsUpdate(old, new *apiv1.PipelineRollout)

// check for any fields we might update in the Spec - generally we'd only update a Finalizer or maybe something in the metadata
// TODO: we would need to update this if we ever add anything else, like a label or annotation - unless there's a generic check that makes sense
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) {
// Checking only the Finalizers and Annotations allows to have more control on updating only when certain changes have been made.
// However, do we want to be also more specific? For instance, check specific finalizers and annotations (ex: .DeepEqual(old.Annotations["somekey"], new.Annotations["somekey"]))
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) || !equality.Semantic.DeepEqual(old.Annotations, new.Annotations) {
return true
}
return false
Expand Down
Loading

0 comments on commit c708d3a

Please sign in to comment.