diff --git a/config/crd/bases/numaplane.numaproj.io_isbservicerollouts.yaml b/config/crd/bases/numaplane.numaproj.io_isbservicerollouts.yaml index 44d0c2af..b2740dc5 100644 --- a/config/crd/bases/numaplane.numaproj.io_isbservicerollouts.yaml +++ b/config/crd/bases/numaplane.numaproj.io_isbservicerollouts.yaml @@ -163,6 +163,11 @@ spec: - Deployed - Failed type: string + upgradeInProgress: + description: UpgradeInProgress indicates the upgrade strategy currently + being used and affecting the resource state or empty if no upgrade + is in progress + type: string type: object required: - spec diff --git a/config/install.yaml b/config/install.yaml index f0409650..3ae638eb 100644 --- a/config/install.yaml +++ b/config/install.yaml @@ -214,6 +214,11 @@ spec: - Deployed - Failed type: string + upgradeInProgress: + description: UpgradeInProgress indicates the upgrade strategy currently + being used and affecting the resource state or empty if no upgrade + is in progress + type: string type: object required: - spec diff --git a/internal/controller/isbservicerollout_controller.go b/internal/controller/isbservicerollout_controller.go index 9f86198a..05495121 100644 --- a/internal/controller/isbservicerollout_controller.go +++ b/internal/controller/isbservicerollout_controller.go @@ -66,23 +66,47 @@ type ISBServiceRolloutReconciler struct { customMetrics *metrics.CustomMetrics // the recorder is used to record events recorder record.EventRecorder + + // maintain inProgressStrategies in memory and in ISBServiceRollout Status + inProgressStrategyMgr *inProgressStrategyMgr } func NewISBServiceRolloutReconciler( - client client.Client, + c client.Client, s *runtime.Scheme, restConfig *rest.Config, customMetrics *metrics.CustomMetrics, recorder record.EventRecorder, ) *ISBServiceRolloutReconciler { - return &ISBServiceRolloutReconciler{ - client, + r := &ISBServiceRolloutReconciler{ + c, s, restConfig, customMetrics, recorder, + nil, } + + r.inProgressStrategyMgr = newInProgressStrategyMgr( + // getRolloutStrategy function: + func(ctx context.Context, rollout client.Object) *apiv1.UpgradeStrategy { + isbServiceRollout := rollout.(*apiv1.ISBServiceRollout) + + if isbServiceRollout.Status.UpgradeInProgress != "" { + return (*apiv1.UpgradeStrategy)(&isbServiceRollout.Status.UpgradeInProgress) + } else { + return nil + } + }, + // setRolloutStrategy function: + func(ctx context.Context, rollout client.Object, strategy apiv1.UpgradeStrategy) { + isbServiceRollout := rollout.(*apiv1.ISBServiceRollout) + isbServiceRollout.Status.SetUpgradeInProgress(strategy) + }, + ) + + return r } //+kubebuilder:rbac:groups=numaplane.numaproj.io,resources=isbservicerollouts,verbs=get;list;watch;create;update;patch;delete @@ -277,33 +301,61 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont // update our Status with the ISBService's Status r.processISBServiceStatus(ctx, existingISBServiceDef, isbServiceRollout) - // if I need to update or am in the middle of an update of the ISBService, then I need to make sure all the Pipelines are pausing - isbServiceNeedsUpdating, isbServiceIsUpdating, err := r.isISBServiceUpdating(ctx, isbServiceRollout, existingISBServiceDef) + // if I am in the middle of an update of the ISBService, then I need to make sure all the Pipelines are pausing + _, isbServiceIsUpdating, err := r.isISBServiceUpdating(ctx, isbServiceRollout, existingISBServiceDef) if err != nil { return false, err } - numaLogger.Debugf("isbServiceNeedsUpdating=%t, isbServiceIsUpdating=%t", isbServiceNeedsUpdating, isbServiceIsUpdating) + // determine if we're trying to update the ISBService spec + // if it's a simple change, direct apply + // if not, it will require PPND or Progressive + isbServiceNeedsToUpdate, upgradeStrategyType, err := usde.ResourceNeedsUpdating(ctx, newISBServiceDef, existingISBServiceDef) + if err != nil { + return false, err + } + numaLogger. + WithValues("isbserviceNeedsToUpdate", isbServiceNeedsToUpdate, "upgradeStrategyType", upgradeStrategyType). + Debug("Upgrade decision result") // set the Status appropriately to "Pending" or "Deployed" // if isbServiceNeedsUpdating - this means there's a mismatch between the desired ISBService spec and actual ISBService spec // Note that this will be reset to "Deployed" later on if a deployment occurs - if isbServiceNeedsUpdating { + if isbServiceNeedsToUpdate { isbServiceRollout.Status.MarkPending() } else { isbServiceRollout.Status.MarkDeployed(isbServiceRollout.Generation) } - // determine the Upgrade Strategy user prefers - upgradeStrategy, err := usde.GetUserStrategy(ctx, isbServiceRollout.Namespace) + // what is the preferred strategy for this namespace? + userPreferredStrategy, err := usde.GetUserStrategy(ctx, newISBServiceDef.Namespace) if err != nil { return false, err } - switch upgradeStrategy { - case config.PPNDStrategyID: + // is there currently an inProgressStrategy for the isbService? (This will override any new decision) + inProgressStrategy := r.inProgressStrategyMgr.getStrategy(ctx, isbServiceRollout) + inProgressStrategySet := (inProgressStrategy != apiv1.UpgradeStrategyNoOp) - return processChildObjectWithPPND(ctx, r.client, isbServiceRollout, r, isbServiceNeedsUpdating, isbServiceIsUpdating, func() error { + // if not, should we set one? + if !inProgressStrategySet { + if userPreferredStrategy == config.PPNDStrategyID { + if upgradeStrategyType == apiv1.UpgradeStrategyPPND { + inProgressStrategy = apiv1.UpgradeStrategyPPND + r.inProgressStrategyMgr.setStrategy(ctx, isbServiceRollout, inProgressStrategy) + } + } + if userPreferredStrategy == config.ProgressiveStrategyID { + if upgradeStrategyType == apiv1.UpgradeStrategyProgressive { + inProgressStrategy = apiv1.UpgradeStrategyProgressive + r.inProgressStrategyMgr.setStrategy(ctx, isbServiceRollout, inProgressStrategy) + } + } + } + + switch inProgressStrategy { + case apiv1.UpgradeStrategyPPND: + done, err := processChildObjectWithPPND(ctx, r.client, isbServiceRollout, r, isbServiceNeedsToUpdate, isbServiceIsUpdating, func() error { r.recorder.Eventf(isbServiceRollout, corev1.EventTypeNormal, "PipelinesPaused", "All Pipelines have paused for ISBService update") err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef) if err != nil { @@ -312,8 +364,17 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds()) return nil }) - case config.NoStrategyID: - if isbServiceNeedsUpdating { + if err != nil { + return false, err + } + if done { + r.inProgressStrategyMgr.unsetStrategy(ctx, isbServiceRollout) + } else { + // requeue if done with PPND is false + return true, nil + } + case apiv1.UpgradeStrategyNoOp: + if isbServiceNeedsToUpdate { // update ISBService err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef) if err != nil { @@ -321,10 +382,10 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont } r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds()) } - case config.ProgressiveStrategyID: + case apiv1.UpgradeStrategyProgressive: return false, errors.New("Progressive Strategy not supported yet") default: - return false, fmt.Errorf("%v strategy not recognized", upgradeStrategy) + return false, fmt.Errorf("%v strategy not recognized", inProgressStrategy) } return false, nil diff --git a/internal/controller/isbservicerollout_controller_test.go b/internal/controller/isbservicerollout_controller_test.go index 3f70c152..6e905e5c 100644 --- a/internal/controller/isbservicerollout_controller_test.go +++ b/internal/controller/isbservicerollout_controller_test.go @@ -35,6 +35,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" @@ -260,119 +261,133 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { recorder := record.NewFakeRecorder(64) - r := &ISBServiceRolloutReconciler{ - client: numaplaneClient, - scheme: scheme.Scheme, - restConfig: restConfig, - customMetrics: customMetrics, - recorder: recorder, - } + r := NewISBServiceRolloutReconciler(numaplaneClient, scheme.Scheme, restConfig, customMetrics, recorder) pipelineROReconciler = &PipelineRolloutReconciler{queue: util.NewWorkQueue("fake_queue")} + ppndUpgradeStrategy := apiv1.UpgradeStrategyPPND + testCases := []struct { - name string - newISBSvcSpec numaflowv1.InterStepBufferServiceSpec - existingISBSvcDef *numaflowv1.InterStepBufferService - existingStatefulSetDef *appsv1.StatefulSet - existingPipeline *numaflowv1.Pipeline - expectedRolloutPhase apiv1.Phase + name string + newISBSvcSpec numaflowv1.InterStepBufferServiceSpec + existingISBSvcDef *numaflowv1.InterStepBufferService + existingStatefulSetDef *appsv1.StatefulSet + existingPipeline *numaflowv1.Pipeline + existingInProgressStrategy *apiv1.UpgradeStrategy + expectedRolloutPhase apiv1.Phase // require these Conditions to be set (note that in real life, previous reconciliations may have set other Conditions from before which are still present) - expectedConditionsSet map[apiv1.ConditionType]metav1.ConditionStatus - expectedISBSvcSpec numaflowv1.InterStepBufferServiceSpec + expectedConditionsSet map[apiv1.ConditionType]metav1.ConditionStatus + expectedISBSvcSpec numaflowv1.InterStepBufferServiceSpec + expectedInProgressStrategy apiv1.UpgradeStrategy }{ { - name: "new ISBService", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), - existingISBSvcDef: nil, - existingStatefulSetDef: nil, - existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning, map[string]string{}), - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "new ISBService", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + existingISBSvcDef: nil, + existingStatefulSetDef: nil, + existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning, map[string]string{}), + expectedRolloutPhase: apiv1.PhaseDeployed, + existingInProgressStrategy: nil, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, { - name: "existing ISBService - no change", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning, map[string]string{}), - expectedRolloutPhase: apiv1.PhaseDeployed, - expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{}, // some Conditions may be set from before, but in any case nothing new to verify - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + name: "existing ISBService - no change", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), + existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning, map[string]string{}), + expectedRolloutPhase: apiv1.PhaseDeployed, + expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{}, // some Conditions may be set from before, but in any case nothing new to verify + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + existingInProgressStrategy: nil, + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, { - name: "existing ISBService - new spec - pipelines not paused", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning, map[string]string{}), - expectedRolloutPhase: apiv1.PhasePending, - expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{apiv1.ConditionPausingPipelines: metav1.ConditionTrue}, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + name: "existing ISBService - new spec - pipelines not paused", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), + existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning, map[string]string{}), + expectedRolloutPhase: apiv1.PhasePending, + expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{apiv1.ConditionPausingPipelines: metav1.ConditionTrue}, + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + existingInProgressStrategy: nil, + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { - name: "existing ISBService - new spec - pipelines paused", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}), - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "existing ISBService - new spec - pipelines paused", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), + existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}), + expectedRolloutPhase: apiv1.PhaseDeployed, + existingInProgressStrategy: &ppndUpgradeStrategy, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionPausingPipelines: metav1.ConditionTrue, apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { - name: "existing ISBService - new spec - pipelines failed", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseFailed, map[string]string{}), - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "existing ISBService - new spec - pipelines failed", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), + existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseFailed, map[string]string{}), + existingInProgressStrategy: &ppndUpgradeStrategy, + expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { - name: "existing ISBService - new spec - pipelines set to allow data loss", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhasePending, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePausing, map[string]string{common.LabelKeyAllowDataLoss: "true"}), - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "existing ISBService - new spec - pipelines set to allow data loss", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhasePending, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), + existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePausing, map[string]string{common.LabelKeyAllowDataLoss: "true"}), + expectedRolloutPhase: apiv1.PhaseDeployed, + existingInProgressStrategy: nil, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { - name: "existing ISBService - spec already updated - isbsvc reconciling", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, false), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", false), - existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}), - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "existing ISBService - spec already updated - isbsvc reconciling", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, false), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", false), + existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}), + expectedRolloutPhase: apiv1.PhaseDeployed, + existingInProgressStrategy: &ppndUpgradeStrategy, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionPausingPipelines: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { - name: "existing ISBService - spec already updated - isbsvc done reconciling", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.11", true), - existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}), - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "existing ISBService - spec already updated - isbsvc done reconciling", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.11", true), + existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}), + expectedRolloutPhase: apiv1.PhaseDeployed, + existingInProgressStrategy: nil, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ - apiv1.ConditionPausingPipelines: metav1.ConditionFalse, + apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, } @@ -397,6 +412,14 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { // create ISBServiceRollout definition rollout := createISBServiceRollout(tc.newISBSvcSpec) + if tc.existingInProgressStrategy != nil { + rollout.Status.UpgradeInProgress = *tc.existingInProgressStrategy + r.inProgressStrategyMgr.store.setStrategy(k8stypes.NamespacedName{Namespace: defaultNamespace, Name: defaultISBSvcRolloutName}, *tc.existingInProgressStrategy) + } else { + rollout.Status.UpgradeInProgress = apiv1.UpgradeStrategyNoOp + r.inProgressStrategyMgr.store.setStrategy(k8stypes.NamespacedName{Namespace: defaultNamespace, Name: defaultISBSvcRolloutName}, apiv1.UpgradeStrategyNoOp) + } + // the Reconcile() function does this, so we need to do it before calling reconcile() as well rollout.Status.Init(rollout.Generation) @@ -437,6 +460,8 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { ////// check results: // Check Phase of Rollout: assert.Equal(t, tc.expectedRolloutPhase, rollout.Status.Phase) + // Check In-Progress Strategy + assert.Equal(t, tc.expectedInProgressStrategy, rollout.Status.UpgradeInProgress) // Check isbsvc resultISBSVC, err := numaflowClientSet.NumaflowV1alpha1().InterStepBufferServices(defaultNamespace).Get(ctx, defaultISBSvcRolloutName, metav1.GetOptions{}) assert.NoError(t, err) diff --git a/internal/controller/numaflowcontrollerrollout_controller.go b/internal/controller/numaflowcontrollerrollout_controller.go index 1f194ed9..1130a178 100644 --- a/internal/controller/numaflowcontrollerrollout_controller.go +++ b/internal/controller/numaflowcontrollerrollout_controller.go @@ -292,7 +292,7 @@ func (r *NumaflowControllerRolloutReconciler) reconcile( controllerRollout.Status.MarkDeployed(controllerRollout.Generation) } - needsRequeue, err := processChildObjectWithPPND(ctx, r.client, controllerRollout, r, controllerDeploymentNeedsUpdating, + done, err := processChildObjectWithPPND(ctx, r.client, controllerRollout, r, controllerDeploymentNeedsUpdating, controllerDeploymentIsUpdating, func() error { r.recorder.Eventf(controllerRollout, corev1.EventTypeNormal, "AllPipelinesPaused", "All Pipelines have paused so Numaflow Controller can safely update") phase, err := r.sync(controllerRollout, namespace, numaLogger) @@ -309,7 +309,7 @@ func (r *NumaflowControllerRolloutReconciler) reconcile( if err != nil { return ctrl.Result{}, err } - if needsRequeue { + if !done { return common.DefaultDelayedRequeue, nil } diff --git a/internal/controller/pause_requester.go b/internal/controller/pause_requester.go index a0859cd5..d1ab09aa 100644 --- a/internal/controller/pause_requester.go +++ b/internal/controller/pause_requester.go @@ -27,7 +27,7 @@ type PauseRequester interface { // process a child object, pausing pipelines or resuming pipelines if needed // return: -// - true if needs a requeue +// - true if done with PPND // - error if any (note we'll automatically reuqueue if there's an error anyway) func processChildObjectWithPPND(ctx context.Context, k8sclient client.Client, rollout client.Object, pauseRequester PauseRequester, resourceNeedsUpdating bool, resourceIsUpdating bool, updateFunc func() error) (bool, error) { @@ -65,7 +65,7 @@ func processChildObjectWithPPND(ctx context.Context, k8sclient client.Client, ro } } - return true, nil + return false, nil } else { // remove any pause requirement if necessary @@ -75,7 +75,7 @@ func processChildObjectWithPPND(ctx context.Context, k8sclient client.Client, ro } } - return false, nil + return true, nil } // request that the Pipelines corresponding to this Rollout pause diff --git a/internal/usde/usde_test.go b/internal/usde/usde_test.go index 5d6b5d04..afcd2507 100644 --- a/internal/usde/usde_test.go +++ b/internal/usde/usde_test.go @@ -9,11 +9,13 @@ import ( "github.com/numaproj/numaplane/internal/controller/config" "github.com/numaproj/numaplane/internal/util/kubernetes" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" + apiresource "k8s.io/apimachinery/pkg/api/resource" ) const defaultNamespace = "default" @@ -61,6 +63,24 @@ var defaultPipelineSpec = numaflowv1.PipelineSpec{ }, } +var volSize, _ = apiresource.ParseQuantity("10Mi") +var memLimit, _ = apiresource.ParseQuantity("10Mi") +var newMemLimit, _ = apiresource.ParseQuantity("20Mi") +var defaultISBServiceSpec = numaflowv1.InterStepBufferServiceSpec{ + Redis: nil, + JetStream: &numaflowv1.JetStreamBufferService{ + Version: "2.9.6", + Persistence: &numaflowv1.PersistenceStrategy{ + VolumeSize: &volSize, + }, + ContainerTemplate: &numaflowv1.ContainerTemplate{ + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{v1.ResourceMemory: memLimit}, + }, + }, + }, +} + func makePipelineDefinition(pipelineSpec numaflowv1.PipelineSpec) kubernetes.GenericObject { pipelineSpecRaw, _ := json.Marshal(pipelineSpec) @@ -85,12 +105,38 @@ func makePipelineDefinition(pipelineSpec numaflowv1.PipelineSpec) kubernetes.Gen } } +func makeISBServiceDefinition(isbServiceSpec numaflowv1.InterStepBufferServiceSpec) kubernetes.GenericObject { + isbServiceSpecRaw, _ := json.Marshal(isbServiceSpec) + + isbrs := apiv1.ISBServiceRolloutSpec{ + InterStepBufferService: apiv1.InterStepBufferService{ + Spec: runtime.RawExtension{ + Raw: isbServiceSpecRaw, + }, + }, + } + + return kubernetes.GenericObject{ + TypeMeta: metav1.TypeMeta{ + Kind: "InterStepBufferService", + APIVersion: "numaflow.numaproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-isbsvc", + Namespace: defaultNamespace, + }, + Spec: isbrs.InterStepBufferService.Spec, + } + +} + func Test_ResourceNeedsUpdating(t *testing.T) { ctx := context.Background() configManager := config.GetConfigManagerInstance() pipelineDefn := makePipelineDefinition(defaultPipelineSpec) + isbServiceDefn := makeISBServiceDefinition(defaultISBServiceSpec) testCases := []struct { name string @@ -306,6 +352,23 @@ func Test_ResourceNeedsUpdating(t *testing.T) { expectedNeedsUpdating: false, expectedStrategy: apiv1.UpgradeStrategyNoOp, }, + { + name: "isb test", + newSpec: isbServiceDefn, + existingSpec: func() kubernetes.GenericObject { + newISBServiceSpec := defaultISBServiceSpec.DeepCopy() + newISBServiceSpec.JetStream.ContainerTemplate.Resources.Limits = v1.ResourceList{v1.ResourceMemory: newMemLimit} + return makeISBServiceDefinition(*newISBServiceSpec) + }(), + usdeConfig: config.USDEConfig{ + DefaultUpgradeStrategy: config.PPNDStrategyID, + PipelineSpecExcludedPaths: []string{"vertices.source.something"}, + ISBServiceSpecExcludedPaths: []string{"jetstream.containerTemplate.resources.limits"}, + }, + namespaceConfig: &config.NamespaceConfig{UpgradeStrategy: "pause-and-drain"}, + expectedNeedsUpdating: true, + expectedStrategy: apiv1.UpgradeStrategyApply, + }, } for _, tc := range testCases { diff --git a/pkg/apis/numaplane/v1alpha1/isbservicerollout_types.go b/pkg/apis/numaplane/v1alpha1/isbservicerollout_types.go index 8d2fcb0c..618546bb 100644 --- a/pkg/apis/numaplane/v1alpha1/isbservicerollout_types.go +++ b/pkg/apis/numaplane/v1alpha1/isbservicerollout_types.go @@ -38,6 +38,9 @@ type InterStepBufferService struct { type ISBServiceRolloutStatus struct { Status `json:",inline"` PauseRequestStatus PauseStatus `json:"pauseRequestStatus,omitempty"` + + // UpgradeInProgress indicates the upgrade strategy currently being used and affecting the resource state or empty if no upgrade is in progress + UpgradeInProgress UpgradeStrategy `json:"upgradeInProgress,omitempty"` } // +genclient @@ -66,6 +69,13 @@ type ISBServiceRolloutList struct { func init() { SchemeBuilder.Register(&ISBServiceRollout{}, &ISBServiceRolloutList{}) } +func (status *ISBServiceRolloutStatus) SetUpgradeInProgress(upgradeStrategy UpgradeStrategy) { + status.UpgradeInProgress = upgradeStrategy +} + +func (status *ISBServiceRolloutStatus) ClearUpgradeInProgress() { + status.UpgradeInProgress = "" +} // IsHealthy indicates whether the InterStepBufferService rollout is healthy or not func (isb *ISBServiceRolloutStatus) IsHealthy() bool { diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index 4f54695d..5f758d88 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -23,6 +23,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apiresource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -125,6 +126,7 @@ var ( } volSize, _ = apiresource.ParseQuantity("10Mi") + memLimit, _ = apiresource.ParseQuantity("20Mi") isbServiceSpec = numaflowv1.InterStepBufferServiceSpec{ Redis: nil, JetStream: &numaflowv1.JetStreamBufferService{ @@ -132,6 +134,25 @@ var ( Persistence: &numaflowv1.PersistenceStrategy{ VolumeSize: &volSize, }, + ContainerTemplate: &numaflowv1.ContainerTemplate{ + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{v1.ResourceMemory: volSize}, + }, + }, + }, + } + ISBServiceSpecExcludedField = numaflowv1.InterStepBufferServiceSpec{ + Redis: nil, + JetStream: &numaflowv1.JetStreamBufferService{ + Version: "2.9.8", + Persistence: &numaflowv1.PersistenceStrategy{ + VolumeSize: &volSize, + }, + ContainerTemplate: &numaflowv1.ContainerTemplate{ + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{v1.ResourceMemory: memLimit}, + }, + }, }, } @@ -496,6 +517,7 @@ var _ = Describe("Functional e2e", Serial, func() { if dataLossPrevention == "true" { document("Verify that in-progress-strategy gets set to PPND") + verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyPPND) verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyPPND) verifyPipelinePaused(Namespace, pipelineRolloutName, pipelineName) @@ -509,6 +531,7 @@ var _ = Describe("Functional e2e", Serial, func() { } return true }, testTimeout).Should(BeTrue()) + } verifyISBServiceSpec(Namespace, isbServiceRolloutName, func(retrievedISBServiceSpec numaflowv1.InterStepBufferServiceSpec) bool { @@ -524,6 +547,50 @@ var _ = Describe("Functional e2e", Serial, func() { }) + It("Should update the child ISBService with an excluded field", func() { + + // new ISBService spec + rawSpec, err := json.Marshal(ISBServiceSpecExcludedField) + Expect(err).ShouldNot(HaveOccurred()) + + updateISBServiceRolloutInK8S(isbServiceRolloutName, func(rollout apiv1.ISBServiceRollout) (apiv1.ISBServiceRollout, error) { + rollout.Spec.InterStepBufferService.Spec.Raw = rawSpec + return rollout, nil + }) + + document("Verify that dependent Pipeline is not paused when an update to ISBService not requiring pause is made") + verifyNotPausing := func() bool { + verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyNoOp) + verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp) + verifyPipelineStatusConsistently(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec, retrievedPipelineStatus numaflowv1.PipelineStatus) bool { + return retrievedPipelineStatus.Phase != numaflowv1.PipelinePhasePaused + }) + isbRollout, _ := isbServiceRolloutClient.Get(ctx, isbServiceRolloutName, metav1.GetOptions{}) + isbCondStatus := getRolloutCondition(isbRollout.Status.Conditions, apiv1.ConditionPausingPipelines) + plRollout, _ := pipelineRolloutClient.Get(ctx, pipelineRolloutName, metav1.GetOptions{}) + plCondStatus := getRolloutCondition(plRollout.Status.Conditions, apiv1.ConditionPipelinePausingOrPaused) + if isbCondStatus == metav1.ConditionTrue || plCondStatus == metav1.ConditionTrue { + return false + } + return true + } + + Consistently(verifyNotPausing, 30*time.Second).Should(BeTrue()) + + verifyISBServiceSpec(Namespace, isbServiceRolloutName, func(retrievedISBServiceSpec numaflowv1.InterStepBufferServiceSpec) bool { + return *retrievedISBServiceSpec.JetStream.ContainerTemplate.Resources.Limits.Memory() == memLimit + }) + + verifyISBSvcRolloutReady(isbServiceRolloutName) + + verifyISBSvcReady(Namespace, isbServiceRolloutName, 3) + + verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyNoOp) + verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp) + verifyPipelineRunning(Namespace, pipelineName, 3) + + }) + It("Should update child MonoVertex if the MonoVertexRollout is updated", func() { // new MonoVertex spec diff --git a/tests/e2e/isbservice.go b/tests/e2e/isbservice.go index 4f30649b..3f94f6cc 100644 --- a/tests/e2e/isbservice.go +++ b/tests/e2e/isbservice.go @@ -81,7 +81,7 @@ func verifyISBSvcRolloutReady(isbServiceRolloutName string) { Eventually(func() metav1.ConditionStatus { rollout, _ := isbServiceRolloutClient.Get(ctx, isbServiceRolloutName, metav1.GetOptions{}) return getRolloutCondition(rollout.Status.Conditions, apiv1.ConditionPausingPipelines) - }, testTimeout, testPollingInterval).Should(Equal(metav1.ConditionFalse)) + }, testTimeout, testPollingInterval).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown))) } } @@ -134,6 +134,14 @@ func updateISBServiceRolloutInK8S(name string, f func(apiv1.ISBServiceRollout) ( Expect(err).ShouldNot(HaveOccurred()) } +func verifyInProgressStrategyISBService(namespace string, isbsvcRolloutName string, inProgressStrategy apiv1.UpgradeStrategy) { + document("Verifying InProgressStrategy") + Eventually(func() bool { + rollout, _ := isbServiceRolloutClient.Get(ctx, isbsvcRolloutName, metav1.GetOptions{}) + return rollout.Status.UpgradeInProgress == inProgressStrategy + }, testTimeout, testPollingInterval).Should(BeTrue()) +} + func watchISBServiceRollout() { defer wg.Done() diff --git a/tests/manifests/special-cases/pause/usde-config.yaml b/tests/manifests/special-cases/pause/usde-config.yaml index 2b31afff..962277ad 100644 --- a/tests/manifests/special-cases/pause/usde-config.yaml +++ b/tests/manifests/special-cases/pause/usde-config.yaml @@ -10,3 +10,5 @@ data: - "lifecycle" - "limits" - "watermark" + isbServiceSpecExcludedPaths: | + - "jetstream.containerTemplate.resources.limits"