Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spec hash annotation #52

Merged
merged 11 commits into from
Jun 12, 2024
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ bin/*
go.work

config/crd/bases/_.yaml
.idea
.idea

afugazzotto marked this conversation as resolved.
Show resolved Hide resolved
config/crd/external/
38 changes: 19 additions & 19 deletions internal/controller/isbservicerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -33,7 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/kubernetes"
"github.com/numaproj/numaplane/internal/util/kubernetes"
"github.com/numaproj/numaplane/internal/util/logger"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)
Expand Down Expand Up @@ -77,7 +76,7 @@ func NewISBServiceRolloutReconciler(
func (r *ISBServiceRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// update the Base Logger's level according to the Numaplane Config
logger.RefreshBaseLoggerLevel()
numaLogger := logger.GetBaseLogger().WithName("reconciler").WithValues("isbservicerollout", req.NamespacedName)
numaLogger := logger.GetBaseLogger().WithName("isbservicerollout-reconciler").WithValues("isbservicerollout", req.NamespacedName)

isbServiceRollout := &apiv1.ISBServiceRollout{}
if err := r.client.Get(ctx, req.NamespacedName, isbServiceRollout); err != nil {
Expand Down Expand Up @@ -143,29 +142,28 @@ func (r *ISBServiceRolloutReconciler) reconcile(ctx context.Context, isbServiceR
controllerutil.AddFinalizer(isbServiceRollout, finalizerName)
}

// apply ISBService
// todo: store hash of spec in annotation; use to compare to determine if anything needs to be updated
obj := kubernetes.GenericObject{
TypeMeta: metav1.TypeMeta{
Kind: "InterStepBufferService",
APIVersion: "numaflow.numaproj.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: isbServiceRollout.Name,
Namespace: isbServiceRollout.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(isbServiceRollout.GetObjectMeta(), apiv1.ISBServiceRolloutGroupVersionKind)},
},
Spec: isbServiceRollout.Spec.InterStepBufferService,
// make an InterStepBufferService object and add/update spec hash on the ISBServiceRollout object
obj, rolloutChildOp, err := makeChildResourceFromRolloutAndUpdateSpecHash(ctx, r.restConfig, isbServiceRollout)
if err != nil {
numaLogger.Errorf(err, "failed to make an InterStepBufferService object and to update the ISBServiceRollout: %v", err)
return err
}

// TODO: instead of doing this, modify the ApplyCRSpec below to be similar to what is done on the PipelineRollout controller code
if rolloutChildOp == RolloutChildNone {
numaLogger.Debug("InterStepBufferService spec is unchanged. No updates will be performed")
return nil
}

err := kubernetes.ApplyCRSpec(ctx, r.restConfig, &obj, "interstepbufferservices")
err = kubernetes.ApplyCRSpec(ctx, r.restConfig, obj, "interstepbufferservices")
if err != nil {
numaLogger.Errorf(err, "failed to apply CR: %v", err)
isbServiceRollout.Status.MarkFailed("ApplyISBServiceFailure", err.Error())
return err
}

// after the Apply, Get the ISBService so that we can propagate its health into our Status
isbsvc, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "interstepbufferservices")
isbsvc, err := kubernetes.GetCR(ctx, r.restConfig, obj, "interstepbufferservices")
if err != nil {
numaLogger.Errorf(err, "failed to get ISBServices: %v", err)
return err
Expand Down Expand Up @@ -206,7 +204,9 @@ func (r *ISBServiceRolloutReconciler) needsUpdate(old, new *apiv1.ISBServiceRoll

// check for any fields we might update in the Spec - generally we'd only update a Finalizer or maybe something in the metadata
// TODO: we would need to update this if we ever add anything else, like a label or annotation - unless there's a generic check that makes sense
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) {
// Checking only the Finalizers and Annotations allows to have more control on updating only when certain changes have been made.
// However, do we want to be also more specific? For instance, check specific finalizers and annotations (ex: .DeepEqual(old.Annotations["somekey"], new.Annotations["somekey"]))
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) || !equality.Semantic.DeepEqual(old.Annotations, new.Annotations) {
return true
}
return false
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/numaflowcontrollerrollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
)

const (
finalizerName = "numaplane-controller"
finalizerName = "numaplane.numaproj.io/numaplane-controller"
)

const (
Expand Down Expand Up @@ -120,7 +120,7 @@ func loadDefinitions() (map[string]string, error) {
func (r *NumaflowControllerRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// update the Base Logger's level according to the Numaplane Config
logger.RefreshBaseLoggerLevel()
numaLogger := logger.GetBaseLogger().WithName("reconciler").WithValues("numaflowcontrollerrollout", req.NamespacedName)
numaLogger := logger.GetBaseLogger().WithName("numaflowcontrollerrollout-reconciler").WithValues("numaflowcontrollerrollout", req.NamespacedName)

// TODO: only allow one controllerRollout per namespace.
numaflowControllerRollout := &apiv1.NumaflowControllerRollout{}
Expand Down
62 changes: 26 additions & 36 deletions internal/controller/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
Expand All @@ -35,7 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/kubernetes"
"github.com/numaproj/numaplane/internal/util/kubernetes"
"github.com/numaproj/numaplane/internal/util/logger"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)
Expand Down Expand Up @@ -79,7 +78,7 @@ func NewPipelineRolloutReconciler(
func (r *PipelineRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// update the Base Logger's level according to the Numaplane Config
logger.RefreshBaseLoggerLevel()
numaLogger := logger.GetBaseLogger().WithName("reconciler").WithValues("pipelinerollout", req.NamespacedName)
numaLogger := logger.GetBaseLogger().WithName("pipelinerollout-reconciler").WithValues("pipelinerollout", req.NamespacedName)
// update the context with this Logger so downstream users can incorporate these values in the logs
ctx = logger.WithLogger(ctx, numaLogger)

Expand Down Expand Up @@ -155,35 +154,22 @@ func (r *PipelineRolloutReconciler) reconcile(
controllerutil.AddFinalizer(pipelineRollout, finalizerName)
}

// apply Pipeline
// todo: store hash of spec in annotation; use to compare to determine if anything needs to be updated
obj := kubernetes.GenericObject{
TypeMeta: metav1.TypeMeta{
Kind: "Pipeline",
APIVersion: "numaflow.numaproj.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: pipelineRollout.Name,
Namespace: pipelineRollout.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pipelineRollout.GetObjectMeta(), apiv1.PipelineRolloutGroupVersionKind)},
},
Spec: pipelineRollout.Spec.Pipeline,
// make a Pipeline object and add/update spec hash on the PipelineRollout object
obj, rolloutChildOp, err := makeChildResourceFromRolloutAndUpdateSpecHash(ctx, r.restConfig, pipelineRollout)
if err != nil {
numaLogger.Errorf(err, "failed to make a Pipeline object and to update the PipelineRollout: %v", err)
return false, err
}

// Get the object to see if it exists
_, err := kubernetes.GetResource(ctx, r.restConfig, &obj, "pipelines")
if err != nil {
// create object as it doesn't exist
if apierrors.IsNotFound(err) {
err = kubernetes.CreateCR(ctx, r.restConfig, &obj, "pipelines")
if err != nil {
return false, err
}
if rolloutChildOp == RolloutChildNew {
err = kubernetes.CreateCR(ctx, r.restConfig, obj, "pipelines")
if err != nil {
return false, err
}
} else {
} else if rolloutChildOp == RolloutChildUpdate {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afugazzotto @xdevxy So, the logic in here only occurs if the spec of the child object has changed, right? I think there's some logic in here related to pausing that we need to do even if that's not the case, isn't there? Such as calling isPipelinePaused()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I also noticed that we need to call processPipelineStatus to update the status correctly. I'm making those change now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should only set the annotation on the Rollout spec until we truly update the object itself.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to think so, right? The annotation should represent that which has actually been deployed.

In the pause logic below we're essentially trying to update the spec. If we don't set the annotation until we've actually deployed the object to that spec, then it is okay to use the conditional "if spec is different" to determine whether to execute that logic.

Copy link
Contributor Author

@afugazzotto afugazzotto Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a valid point. So, it may be best to set the hash annotation only after a successful call to ApplyCRSpec? Would it make sense to set the hash annotation together with the status (ex: in processPipelineStatus)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it may be best to set the hash annotation only after a successful call to ApplyCRSpec?

yes

Would it make sense to set the hash annotation together with the status (ex: in processPipelineStatus)?

processPipelineStatus() should be called all the time

Copy link
Contributor Author

@afugazzotto afugazzotto Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #56

// If the pipeline already exists, first check if the pipeline status
// is pausing. If so, re-enqueue immediately.
pipeline, err := kubernetes.GetCR(ctx, r.restConfig, &obj, "pipelines")
pipeline, err := kubernetes.GetCR(ctx, r.restConfig, obj, "pipelines")
if err != nil {
numaLogger.Errorf(err, "failed to get Pipeline: %v", err)
return false, err
Expand All @@ -201,13 +187,13 @@ func (r *PipelineRolloutReconciler) reconcile(
// Apply the new spec and resume the pipeline
// TODO: in the future, need to take into account whether Numaflow Controller
// or ISBService is being installed to determine whether it's safe to unpause
newObj, err := setPipelineDesiredStatus(&obj, "Running")
newObj, err := setPipelineDesiredStatus(obj, "Running")
if err != nil {
return false, err
}
obj = *newObj
obj = newObj

err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout)
err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout)
if err != nil {
return false, err
}
Expand All @@ -216,31 +202,33 @@ func (r *PipelineRolloutReconciler) reconcile(
}

// If pipeline status is not above, detect if pausing is required.
shouldPause, err := needsPausing(pipeline, &obj)
shouldPause, err := needsPausing(pipeline, obj)
if err != nil {
return false, err
}
if shouldPause {
// Use the existing spec, then pause and re-enqueue
obj.Spec = pipeline.Spec
newObj, err := setPipelineDesiredStatus(&obj, "Paused")
newObj, err := setPipelineDesiredStatus(obj, "Paused")
if err != nil {
return false, err
}
obj = *newObj
obj = newObj

err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout)
err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout)
if err != nil {
return false, err
}
return true, err
}

// If no need to pause, just apply the spec
err = applyPipelineSpec(ctx, r.restConfig, &obj, pipelineRollout)
err = applyPipelineSpec(ctx, r.restConfig, obj, pipelineRollout)
if err != nil {
return false, err
}
} else {
numaLogger.Debug("Pipeline spec is unchanged. No updates will be performed")
}

pipelineRollout.Status.MarkRunning()
Expand Down Expand Up @@ -277,7 +265,9 @@ func (r *PipelineRolloutReconciler) needsUpdate(old, new *apiv1.PipelineRollout)

// check for any fields we might update in the Spec - generally we'd only update a Finalizer or maybe something in the metadata
// TODO: we would need to update this if we ever add anything else, like a label or annotation - unless there's a generic check that makes sense
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) {
// Checking only the Finalizers and Annotations allows to have more control on updating only when certain changes have been made.
// However, do we want to be also more specific? For instance, check specific finalizers and annotations (ex: .DeepEqual(old.Annotations["somekey"], new.Annotations["somekey"]))
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) || !equality.Semantic.DeepEqual(old.Annotations, new.Annotations) {
return true
}
return false
Expand Down
Loading
Loading