From e49cefb13d0e3f1eaed6017310ee54dd45624d5e Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 30 Sep 2024 16:13:57 -0700 Subject: [PATCH 01/23] feat: integrate USDE logic into isbsvc controller Signed-off-by: Dillen Padhiar --- .../isbservicerollout_controller.go | 77 ++++++++++++++++--- .../isbservicerollout_controller_test.go | 8 +- .../v1alpha1/isbservicerollout_types.go | 10 +++ 3 files changed, 76 insertions(+), 19 deletions(-) diff --git a/internal/controller/isbservicerollout_controller.go b/internal/controller/isbservicerollout_controller.go index bf8ac509..5efe4b5e 100644 --- a/internal/controller/isbservicerollout_controller.go +++ b/internal/controller/isbservicerollout_controller.go @@ -66,23 +66,47 @@ type ISBServiceRolloutReconciler struct { customMetrics *metrics.CustomMetrics // the recorder is used to record events recorder record.EventRecorder + + // maintain inProgressStrategies in memory and in ISBServiceRollout Status + inProgressStrategyMgr *inProgressStrategyMgr } func NewISBServiceRolloutReconciler( - client client.Client, + c client.Client, s *runtime.Scheme, restConfig *rest.Config, customMetrics *metrics.CustomMetrics, recorder record.EventRecorder, ) *ISBServiceRolloutReconciler { - return &ISBServiceRolloutReconciler{ - client, + r := &ISBServiceRolloutReconciler{ + c, s, restConfig, customMetrics, recorder, + nil, } + + r.inProgressStrategyMgr = newInProgressStrategyMgr( + // getRolloutStrategy function: + func(ctx context.Context, rollout client.Object) *apiv1.UpgradeStrategy { + isbServiceRollout := rollout.(*apiv1.ISBServiceRollout) + + if isbServiceRollout.Status.UpgradeInProgress != "" { + return (*apiv1.UpgradeStrategy)(&isbServiceRollout.Status.UpgradeInProgress) + } else { + return nil + } + }, + // setRolloutStrategy function: + func(ctx context.Context, rollout client.Object, strategy apiv1.UpgradeStrategy) { + isbServiceRollout := rollout.(*apiv1.ISBServiceRollout) + isbServiceRollout.Status.SetUpgradeInProgress(strategy) + }, + ) + + return r } //+kubebuilder:rbac:groups=numaplane.numaproj.io,resources=isbservicerollouts,verbs=get;list;watch;create;update;patch;delete @@ -278,31 +302,60 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont r.processISBServiceStatus(ctx, existingISBServiceDef, isbServiceRollout) // if I need to update or am in the middle of an update of the ISBService, then I need to make sure all the Pipelines are pausing + // TODO: do we need the value isbServiceNeedsUpdating still? isbServiceNeedsUpdating, isbServiceIsUpdating, err := r.isISBServiceUpdating(ctx, isbServiceRollout, existingISBServiceDef) if err != nil { return false, err } - numaLogger.Debugf("isbServiceNeedsUpdating=%t, isbServiceIsUpdating=%t", isbServiceNeedsUpdating, isbServiceIsUpdating) + // determine if we're trying to update the ISBService spec + // if it's a simple change, direct apply + // if not, it will require PPND or Progressive + isbServiceNeedsToUpdate, upgradeStrategyType, err := usde.ResourceNeedsUpdating(ctx, newISBServiceDef, existingISBServiceDef) + if err != nil { + return false, err + } + numaLogger. + WithValues("isbserviceNeedsToUpdate", isbServiceNeedsToUpdate, "upgradeStrategyType", upgradeStrategyType). + Debug("Upgrade decision result") // set the Status appropriately to "Pending" or "Deployed" // if isbServiceNeedsUpdating - this means there's a mismatch between the desired ISBService spec and actual ISBService spec // Note that this will be reset to "Deployed" later on if a deployment occurs - if isbServiceNeedsUpdating { + if isbServiceNeedsToUpdate { isbServiceRollout.Status.MarkPending() } else { isbServiceRollout.Status.MarkDeployed(isbServiceRollout.Generation) } - // determine the Upgrade Strategy user prefers - upgradeStrategy, err := usde.GetUserStrategy(ctx, isbServiceRollout.Namespace) + // what is the preferred strategy for this namespace? + userPreferredStrategy, err := usde.GetUserStrategy(ctx, newISBServiceDef.Namespace) if err != nil { return false, err } - switch upgradeStrategy { - case config.PPNDStrategyID: + // is there currently an inProgressStrategy for the isbService? (This will override any new decision) + inProgressStrategy := r.inProgressStrategyMgr.getStrategy(ctx, isbServiceRollout) + inProgressStrategySet := (inProgressStrategy != apiv1.UpgradeStrategyNoOp) + + // if not, should we set one? + if !inProgressStrategySet { + if userPreferredStrategy == config.PPNDStrategyID { + if upgradeStrategyType == apiv1.UpgradeStrategyPPND { + inProgressStrategy = apiv1.UpgradeStrategyPPND + r.inProgressStrategyMgr.setStrategy(ctx, isbServiceRollout, inProgressStrategy) + } + } + if userPreferredStrategy == config.ProgressiveStrategyID { + if upgradeStrategyType == apiv1.UpgradeStrategyProgressive { + inProgressStrategy = apiv1.UpgradeStrategyProgressive + r.inProgressStrategyMgr.setStrategy(ctx, isbServiceRollout, inProgressStrategy) + } + } + } + switch inProgressStrategy { + case apiv1.UpgradeStrategyPPND: return processChildObjectWithPPND(ctx, isbServiceRollout, r, isbServiceNeedsUpdating, isbServiceIsUpdating, func() error { r.recorder.Eventf(isbServiceRollout, corev1.EventTypeNormal, "PipelinesPaused", "All Pipelines have paused for ISBService update") err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef) @@ -312,7 +365,7 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds()) return nil }) - case config.NoStrategyID: + case apiv1.UpgradeStrategyNoOp: if isbServiceNeedsUpdating { // update ISBService err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef) @@ -321,10 +374,10 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont } r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds()) } - case config.ProgressiveStrategyID: + case apiv1.UpgradeStrategyProgressive: return false, errors.New("Progressive Strategy not supported yet") default: - return false, fmt.Errorf("%v strategy not recognized", upgradeStrategy) + return false, fmt.Errorf("%v strategy not recognized", inProgressStrategy) } return false, nil diff --git a/internal/controller/isbservicerollout_controller_test.go b/internal/controller/isbservicerollout_controller_test.go index 68f8d1ca..ded662e6 100644 --- a/internal/controller/isbservicerollout_controller_test.go +++ b/internal/controller/isbservicerollout_controller_test.go @@ -259,13 +259,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { recorder := record.NewFakeRecorder(64) - r := &ISBServiceRolloutReconciler{ - client: numaplaneClient, - scheme: scheme.Scheme, - restConfig: restConfig, - customMetrics: customMetrics, - recorder: recorder, - } + r := NewISBServiceRolloutReconciler(numaplaneClient, scheme.Scheme, restConfig, customMetrics, recorder) pipelineROReconciler = &PipelineRolloutReconciler{queue: util.NewWorkQueue("fake_queue")} diff --git a/pkg/apis/numaplane/v1alpha1/isbservicerollout_types.go b/pkg/apis/numaplane/v1alpha1/isbservicerollout_types.go index 8d2fcb0c..618546bb 100644 --- a/pkg/apis/numaplane/v1alpha1/isbservicerollout_types.go +++ b/pkg/apis/numaplane/v1alpha1/isbservicerollout_types.go @@ -38,6 +38,9 @@ type InterStepBufferService struct { type ISBServiceRolloutStatus struct { Status `json:",inline"` PauseRequestStatus PauseStatus `json:"pauseRequestStatus,omitempty"` + + // UpgradeInProgress indicates the upgrade strategy currently being used and affecting the resource state or empty if no upgrade is in progress + UpgradeInProgress UpgradeStrategy `json:"upgradeInProgress,omitempty"` } // +genclient @@ -66,6 +69,13 @@ type ISBServiceRolloutList struct { func init() { SchemeBuilder.Register(&ISBServiceRollout{}, &ISBServiceRolloutList{}) } +func (status *ISBServiceRolloutStatus) SetUpgradeInProgress(upgradeStrategy UpgradeStrategy) { + status.UpgradeInProgress = upgradeStrategy +} + +func (status *ISBServiceRolloutStatus) ClearUpgradeInProgress() { + status.UpgradeInProgress = "" +} // IsHealthy indicates whether the InterStepBufferService rollout is healthy or not func (isb *ISBServiceRolloutStatus) IsHealthy() bool { From 398b6ac31d327820f0d9ddb8e5247f2a21b7ff69 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 30 Sep 2024 16:14:06 -0700 Subject: [PATCH 02/23] chore: codegen Signed-off-by: Dillen Padhiar --- .../crd/bases/numaplane.numaproj.io_isbservicerollouts.yaml | 5 +++++ config/install.yaml | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/config/crd/bases/numaplane.numaproj.io_isbservicerollouts.yaml b/config/crd/bases/numaplane.numaproj.io_isbservicerollouts.yaml index 44d0c2af..b2740dc5 100644 --- a/config/crd/bases/numaplane.numaproj.io_isbservicerollouts.yaml +++ b/config/crd/bases/numaplane.numaproj.io_isbservicerollouts.yaml @@ -163,6 +163,11 @@ spec: - Deployed - Failed type: string + upgradeInProgress: + description: UpgradeInProgress indicates the upgrade strategy currently + being used and affecting the resource state or empty if no upgrade + is in progress + type: string type: object required: - spec diff --git a/config/install.yaml b/config/install.yaml index f0409650..3ae638eb 100644 --- a/config/install.yaml +++ b/config/install.yaml @@ -214,6 +214,11 @@ spec: - Deployed - Failed type: string + upgradeInProgress: + description: UpgradeInProgress indicates the upgrade strategy currently + being used and affecting the resource state or empty if no upgrade + is in progress + type: string type: object required: - spec From 146dd3615270d1cb4a4b1accefda8297231d1357 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Wed, 2 Oct 2024 13:13:29 -0700 Subject: [PATCH 03/23] test: unit tests Signed-off-by: Dillen Padhiar --- .../isbservicerollout_controller_test.go | 148 +++++++++++------- 1 file changed, 89 insertions(+), 59 deletions(-) diff --git a/internal/controller/isbservicerollout_controller_test.go b/internal/controller/isbservicerollout_controller_test.go index ded662e6..84529864 100644 --- a/internal/controller/isbservicerollout_controller_test.go +++ b/internal/controller/isbservicerollout_controller_test.go @@ -35,6 +35,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" @@ -263,97 +264,116 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { pipelineROReconciler = &PipelineRolloutReconciler{queue: util.NewWorkQueue("fake_queue")} + ppndUpgradeStrategy := apiv1.UpgradeStrategyPPND + testCases := []struct { - name string - newISBSvcSpec numaflowv1.InterStepBufferServiceSpec - existingISBSvcDef *numaflowv1.InterStepBufferService - existingStatefulSetDef *appsv1.StatefulSet - existingPipelinePhase numaflowv1.PipelinePhase - expectedRolloutPhase apiv1.Phase + name string + newISBSvcSpec numaflowv1.InterStepBufferServiceSpec + + existingInProgressStrategy *apiv1.UpgradeStrategy + existingISBSvcDef *numaflowv1.InterStepBufferService + existingStatefulSetDef *appsv1.StatefulSet + existingPipelinePhase numaflowv1.PipelinePhase + expectedRolloutPhase apiv1.Phase // require these Conditions to be set (note that in real life, previous reconciliations may have set other Conditions from before which are still present) - expectedConditionsSet map[apiv1.ConditionType]metav1.ConditionStatus - expectedISBSvcSpec numaflowv1.InterStepBufferServiceSpec + expectedConditionsSet map[apiv1.ConditionType]metav1.ConditionStatus + expectedISBSvcSpec numaflowv1.InterStepBufferServiceSpec + expectedInProgressStrategy apiv1.UpgradeStrategy }{ { - name: "new ISBService", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), - existingISBSvcDef: nil, - existingStatefulSetDef: nil, - existingPipelinePhase: numaflowv1.PipelinePhaseRunning, - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "new ISBService", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + existingISBSvcDef: nil, + existingStatefulSetDef: nil, + existingPipelinePhase: numaflowv1.PipelinePhaseRunning, + existingInProgressStrategy: nil, + expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, { - name: "existing ISBService - no change", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipelinePhase: numaflowv1.PipelinePhaseRunning, - expectedRolloutPhase: apiv1.PhaseDeployed, - expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{}, // some Conditions may be set from before, but in any case nothing new to verify - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + name: "existing ISBService - no change", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), + existingPipelinePhase: numaflowv1.PipelinePhaseRunning, + existingInProgressStrategy: nil, + expectedRolloutPhase: apiv1.PhaseDeployed, + expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{}, // some Conditions may be set from before, but in any case nothing new to verify + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, { - name: "existing ISBService - new spec - pipelines not paused", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipelinePhase: numaflowv1.PipelinePhaseRunning, - expectedRolloutPhase: apiv1.PhasePending, - expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{apiv1.ConditionPausingPipelines: metav1.ConditionTrue}, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + name: "existing ISBService - new spec - pipelines not paused", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), + existingPipelinePhase: numaflowv1.PipelinePhaseRunning, + existingInProgressStrategy: nil, + expectedRolloutPhase: apiv1.PhasePending, + expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{apiv1.ConditionPausingPipelines: metav1.ConditionTrue}, + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { - name: "existing ISBService - new spec - pipelines paused", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipelinePhase: numaflowv1.PipelinePhasePaused, - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "existing ISBService - new spec - pipelines paused", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), + existingPipelinePhase: numaflowv1.PipelinePhasePaused, + existingInProgressStrategy: &ppndUpgradeStrategy, + expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionPausingPipelines: metav1.ConditionTrue, apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { - name: "existing ISBService - new spec - pipelines failed", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipelinePhase: numaflowv1.PipelinePhaseFailed, - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "existing ISBService - new spec - pipelines failed", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), + existingPipelinePhase: numaflowv1.PipelinePhaseFailed, + existingInProgressStrategy: nil, + expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { - name: "existing ISBService - spec already updated - isbsvc reconciling", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, false), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", false), - existingPipelinePhase: numaflowv1.PipelinePhasePaused, - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "existing ISBService - spec already updated - isbsvc reconciling", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, false), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", false), + existingPipelinePhase: numaflowv1.PipelinePhasePaused, + existingInProgressStrategy: nil, + expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionPausingPipelines: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, { - name: "existing ISBService - spec already updated - isbsvc done reconciling", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.11", true), - existingPipelinePhase: numaflowv1.PipelinePhasePaused, - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "existing ISBService - spec already updated - isbsvc done reconciling", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.11", true), + existingPipelinePhase: numaflowv1.PipelinePhasePaused, + existingInProgressStrategy: nil, + expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionPausingPipelines: metav1.ConditionFalse, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, } @@ -378,6 +398,14 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { // create ISBServiceRollout definition rollout := createISBServiceRollout(tc.newISBSvcSpec) + if tc.existingInProgressStrategy != nil { + rollout.Status.UpgradeInProgress = *tc.existingInProgressStrategy + r.inProgressStrategyMgr.store.setStrategy(k8stypes.NamespacedName{Namespace: defaultNamespace, Name: defaultISBSvcRolloutName}, *tc.existingInProgressStrategy) + } else { + rollout.Status.UpgradeInProgress = apiv1.UpgradeStrategyNoOp + r.inProgressStrategyMgr.store.setStrategy(k8stypes.NamespacedName{Namespace: defaultNamespace, Name: defaultISBSvcRolloutName}, apiv1.UpgradeStrategyNoOp) + } + // the Reconcile() function does this, so we need to do it before calling reconcile() as well rollout.Status.Init(rollout.Generation) @@ -419,6 +447,8 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { ////// check results: // Check Phase of Rollout: assert.Equal(t, tc.expectedRolloutPhase, rollout.Status.Phase) + // Check In-Progress Strategy + assert.Equal(t, tc.expectedInProgressStrategy, rollout.Status.UpgradeInProgress) // Check isbsvc resultISBSVC, err := numaflowClientSet.NumaflowV1alpha1().InterStepBufferServices(defaultNamespace).Get(ctx, defaultISBSvcRolloutName, metav1.GetOptions{}) assert.NoError(t, err) From 2e722d693b8d3005b6bcf25ab73698945c3d7a67 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 3 Oct 2024 10:25:55 -0700 Subject: [PATCH 04/23] test: changes for checking pipeline paused cond Signed-off-by: Dillen Padhiar --- .../controller/isbservicerollout_controller_test.go | 6 +++--- tests/e2e/isbservice.go | 10 +++++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/internal/controller/isbservicerollout_controller_test.go b/internal/controller/isbservicerollout_controller_test.go index 84529864..fac6c411 100644 --- a/internal/controller/isbservicerollout_controller_test.go +++ b/internal/controller/isbservicerollout_controller_test.go @@ -353,13 +353,13 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, false), existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", false), existingPipelinePhase: numaflowv1.PipelinePhasePaused, - existingInProgressStrategy: nil, + existingInProgressStrategy: &ppndUpgradeStrategy, expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionPausingPipelines: metav1.ConditionTrue, }, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { name: "existing ISBService - spec already updated - isbsvc done reconciling", @@ -370,7 +370,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { existingInProgressStrategy: nil, expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ - apiv1.ConditionPausingPipelines: metav1.ConditionFalse, + apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, diff --git a/tests/e2e/isbservice.go b/tests/e2e/isbservice.go index 4f30649b..3f94f6cc 100644 --- a/tests/e2e/isbservice.go +++ b/tests/e2e/isbservice.go @@ -81,7 +81,7 @@ func verifyISBSvcRolloutReady(isbServiceRolloutName string) { Eventually(func() metav1.ConditionStatus { rollout, _ := isbServiceRolloutClient.Get(ctx, isbServiceRolloutName, metav1.GetOptions{}) return getRolloutCondition(rollout.Status.Conditions, apiv1.ConditionPausingPipelines) - }, testTimeout, testPollingInterval).Should(Equal(metav1.ConditionFalse)) + }, testTimeout, testPollingInterval).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown))) } } @@ -134,6 +134,14 @@ func updateISBServiceRolloutInK8S(name string, f func(apiv1.ISBServiceRollout) ( Expect(err).ShouldNot(HaveOccurred()) } +func verifyInProgressStrategyISBService(namespace string, isbsvcRolloutName string, inProgressStrategy apiv1.UpgradeStrategy) { + document("Verifying InProgressStrategy") + Eventually(func() bool { + rollout, _ := isbServiceRolloutClient.Get(ctx, isbsvcRolloutName, metav1.GetOptions{}) + return rollout.Status.UpgradeInProgress == inProgressStrategy + }, testTimeout, testPollingInterval).Should(BeTrue()) +} + func watchISBServiceRollout() { defer wg.Done() From 92dfa431a4753c991078a8deffeed3501b762e42 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 3 Oct 2024 11:15:59 -0700 Subject: [PATCH 05/23] test: check inprogressstrategy of isbservice Signed-off-by: Dillen Padhiar --- tests/e2e/functional_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index 4f54695d..f2e65897 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -496,6 +496,7 @@ var _ = Describe("Functional e2e", Serial, func() { if dataLossPrevention == "true" { document("Verify that in-progress-strategy gets set to PPND") + verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyPPND) verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyPPND) verifyPipelinePaused(Namespace, pipelineRolloutName, pipelineName) From 075c649e8120afcbb06c2ecf23c97ca45558b223 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 3 Oct 2024 14:22:18 -0700 Subject: [PATCH 06/23] test: add case for noop update on isbservice Signed-off-by: Dillen Padhiar --- tests/e2e/functional_test.go | 61 +++++++++++++++++++ .../special-cases/pause/usde-config.yaml | 2 + 2 files changed, 63 insertions(+) diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index f2e65897..dc2efb14 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -23,6 +23,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apiresource "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -125,6 +126,7 @@ var ( } volSize, _ = apiresource.ParseQuantity("10Mi") + memLimit, _ = apiresource.ParseQuantity("10Gi") isbServiceSpec = numaflowv1.InterStepBufferServiceSpec{ Redis: nil, JetStream: &numaflowv1.JetStreamBufferService{ @@ -134,6 +136,20 @@ var ( }, }, } + ISBServiceSpecExcludedField = numaflowv1.InterStepBufferServiceSpec{ + Redis: nil, + JetStream: &numaflowv1.JetStreamBufferService{ + Version: "2.9.8", + Persistence: &numaflowv1.PersistenceStrategy{ + VolumeSize: &volSize, + }, + ContainerTemplate: &numaflowv1.ContainerTemplate{ + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{v1.ResourceMemory: memLimit}, + }, + }, + }, + } monoVertexSpec = numaflowv1.MonoVertexSpec{ Replicas: ptr.To(int32(1)), @@ -525,6 +541,51 @@ var _ = Describe("Functional e2e", Serial, func() { }) + It("Should update the child ISBService with an excluded field", func() { + + // new ISBService spec + rawSpec, err := json.Marshal(ISBServiceSpecExcludedField) + Expect(err).ShouldNot(HaveOccurred()) + + updateISBServiceRolloutInK8S(isbServiceRolloutName, func(rollout apiv1.ISBServiceRollout) (apiv1.ISBServiceRollout, error) { + rollout.Spec.InterStepBufferService.Spec.Raw = rawSpec + return rollout, nil + }) + + if dataLossPrevention == "true" { + + document("Verify that in-progress-strategy gets set to NoOp") + verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyNoOp) + verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp) + // verifyPipelinePaused(Namespace, pipelineRolloutName, pipelineName) + + Eventually(func() bool { + isbRollout, _ := isbServiceRolloutClient.Get(ctx, isbServiceRolloutName, metav1.GetOptions{}) + isbCondStatus := getRolloutCondition(isbRollout.Status.Conditions, apiv1.ConditionPausingPipelines) + plRollout, _ := pipelineRolloutClient.Get(ctx, pipelineRolloutName, metav1.GetOptions{}) + plCondStatus := getRolloutCondition(plRollout.Status.Conditions, apiv1.ConditionPipelinePausingOrPaused) + if isbCondStatus == metav1.ConditionTrue || plCondStatus == metav1.ConditionTrue { + return false + } + return true + }, testTimeout).Should(BeTrue()) + } + + verifyISBServiceSpec(Namespace, isbServiceRolloutName, func(retrievedISBServiceSpec numaflowv1.InterStepBufferServiceSpec) bool { + // return retrievedISBServiceSpec.JetStream.Version == "2.9.8" + return retrievedISBServiceSpec.JetStream.ContainerTemplate.Resources.Limits.Memory() == &memLimit + }) + + verifyISBSvcRolloutReady(isbServiceRolloutName) + + verifyISBSvcReady(Namespace, isbServiceRolloutName, 3) + + verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyNoOp) + verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp) + verifyPipelineRunning(Namespace, pipelineName, 3) + + }) + It("Should update child MonoVertex if the MonoVertexRollout is updated", func() { // new MonoVertex spec diff --git a/tests/manifests/special-cases/pause/usde-config.yaml b/tests/manifests/special-cases/pause/usde-config.yaml index 2b31afff..962277ad 100644 --- a/tests/manifests/special-cases/pause/usde-config.yaml +++ b/tests/manifests/special-cases/pause/usde-config.yaml @@ -10,3 +10,5 @@ data: - "lifecycle" - "limits" - "watermark" + isbServiceSpecExcludedPaths: | + - "jetstream.containerTemplate.resources.limits" From 2a6aeb5f55538b231ed9f999873f774dd081e56a Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 3 Oct 2024 14:22:33 -0700 Subject: [PATCH 07/23] fix: unset strategy Signed-off-by: Dillen Padhiar --- internal/controller/isbservicerollout_controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/controller/isbservicerollout_controller.go b/internal/controller/isbservicerollout_controller.go index 5efe4b5e..18c6c46b 100644 --- a/internal/controller/isbservicerollout_controller.go +++ b/internal/controller/isbservicerollout_controller.go @@ -363,6 +363,7 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont return err } r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds()) + r.inProgressStrategyMgr.unsetStrategy(ctx, isbServiceRollout) return nil }) case apiv1.UpgradeStrategyNoOp: @@ -374,6 +375,7 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont } r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds()) } + // r.inProgressStrategyMgr.unsetStrategy(ctx, isbServiceRollout) case apiv1.UpgradeStrategyProgressive: return false, errors.New("Progressive Strategy not supported yet") default: From 309e2cf96abe971ea02c57be84dd1561110f8e62 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 3 Oct 2024 14:34:56 -0700 Subject: [PATCH 08/23] test: update unit tests Signed-off-by: Dillen Padhiar --- internal/controller/isbservicerollout_controller_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/controller/isbservicerollout_controller_test.go b/internal/controller/isbservicerollout_controller_test.go index fac6c411..b64d8deb 100644 --- a/internal/controller/isbservicerollout_controller_test.go +++ b/internal/controller/isbservicerollout_controller_test.go @@ -331,7 +331,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, { name: "existing ISBService - new spec - pipelines failed", @@ -345,7 +345,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, { name: "existing ISBService - spec already updated - isbsvc reconciling", From a1dcfe512d0f77346c9f94115f63174fd1b61ce1 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 3 Oct 2024 14:40:03 -0700 Subject: [PATCH 09/23] test: verifyISBServiceSpec compares pointer values Signed-off-by: Dillen Padhiar --- tests/e2e/functional_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index dc2efb14..613147d4 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -573,7 +573,7 @@ var _ = Describe("Functional e2e", Serial, func() { verifyISBServiceSpec(Namespace, isbServiceRolloutName, func(retrievedISBServiceSpec numaflowv1.InterStepBufferServiceSpec) bool { // return retrievedISBServiceSpec.JetStream.Version == "2.9.8" - return retrievedISBServiceSpec.JetStream.ContainerTemplate.Resources.Limits.Memory() == &memLimit + return *retrievedISBServiceSpec.JetStream.ContainerTemplate.Resources.Limits.Memory() == memLimit }) verifyISBSvcRolloutReady(isbServiceRolloutName) From f69b7dd9ef78fefc39045e7c1a4f632e6e845500 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Thu, 3 Oct 2024 15:00:07 -0700 Subject: [PATCH 10/23] fix: pipeline wasn't unpausing Signed-off-by: Dillen Padhiar --- internal/controller/isbservicerollout_controller.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/controller/isbservicerollout_controller.go b/internal/controller/isbservicerollout_controller.go index 18c6c46b..65bfbe2d 100644 --- a/internal/controller/isbservicerollout_controller.go +++ b/internal/controller/isbservicerollout_controller.go @@ -356,16 +356,23 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont switch inProgressStrategy { case apiv1.UpgradeStrategyPPND: - return processChildObjectWithPPND(ctx, isbServiceRollout, r, isbServiceNeedsUpdating, isbServiceIsUpdating, func() error { + done, err := processChildObjectWithPPND(ctx, isbServiceRollout, r, isbServiceNeedsUpdating, isbServiceIsUpdating, func() error { r.recorder.Eventf(isbServiceRollout, corev1.EventTypeNormal, "PipelinesPaused", "All Pipelines have paused for ISBService update") err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef) if err != nil { return err } r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds()) - r.inProgressStrategyMgr.unsetStrategy(ctx, isbServiceRollout) + // r.inProgressStrategyMgr.unsetStrategy(ctx, isbServiceRollout) return nil }) + if err != nil { + return false, err + } + if done { + r.inProgressStrategyMgr.unsetStrategy(ctx, isbServiceRollout) + return done, nil + } case apiv1.UpgradeStrategyNoOp: if isbServiceNeedsUpdating { // update ISBService From 721635cd1261e2fb8a41cce28f207775b8eace30 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Fri, 4 Oct 2024 11:05:27 -0700 Subject: [PATCH 11/23] feat: unset strategy Signed-off-by: Dillen Padhiar --- internal/controller/isbservicerollout_controller.go | 1 - tests/e2e/functional_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/controller/isbservicerollout_controller.go b/internal/controller/isbservicerollout_controller.go index 65bfbe2d..6c0fa92b 100644 --- a/internal/controller/isbservicerollout_controller.go +++ b/internal/controller/isbservicerollout_controller.go @@ -363,7 +363,6 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont return err } r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds()) - // r.inProgressStrategyMgr.unsetStrategy(ctx, isbServiceRollout) return nil }) if err != nil { diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index 613147d4..cafc567e 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -126,7 +126,7 @@ var ( } volSize, _ = apiresource.ParseQuantity("10Mi") - memLimit, _ = apiresource.ParseQuantity("10Gi") + memLimit, _ = apiresource.ParseQuantity("10Mi") isbServiceSpec = numaflowv1.InterStepBufferServiceSpec{ Redis: nil, JetStream: &numaflowv1.JetStreamBufferService{ From e0446c77bea324f92f909b0b6ce157f588e1282e Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Fri, 4 Oct 2024 11:21:50 -0700 Subject: [PATCH 12/23] test: update unit test Signed-off-by: Dillen Padhiar --- internal/controller/isbservicerollout_controller_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/controller/isbservicerollout_controller_test.go b/internal/controller/isbservicerollout_controller_test.go index b64d8deb..bd6a2237 100644 --- a/internal/controller/isbservicerollout_controller_test.go +++ b/internal/controller/isbservicerollout_controller_test.go @@ -316,7 +316,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { expectedRolloutPhase: apiv1.PhasePending, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{apiv1.ConditionPausingPipelines: metav1.ConditionTrue}, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), - expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, { name: "existing ISBService - new spec - pipelines paused", @@ -359,7 +359,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { apiv1.ConditionPausingPipelines: metav1.ConditionTrue, }, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, + expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, { name: "existing ISBService - spec already updated - isbsvc done reconciling", From baaed8186b3a5f5d7378a611fc16290b900dd260 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Fri, 4 Oct 2024 14:05:33 -0700 Subject: [PATCH 13/23] fix: invert processchildobjectwithPPND func Signed-off-by: Dillen Padhiar --- internal/controller/isbservicerollout_controller.go | 12 ++++++------ .../controller/isbservicerollout_controller_test.go | 8 ++++---- .../numaflowcontrollerrollout_controller.go | 4 ++-- internal/controller/pause_requester.go | 6 +++--- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/internal/controller/isbservicerollout_controller.go b/internal/controller/isbservicerollout_controller.go index 6c0fa92b..170717ad 100644 --- a/internal/controller/isbservicerollout_controller.go +++ b/internal/controller/isbservicerollout_controller.go @@ -302,8 +302,7 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont r.processISBServiceStatus(ctx, existingISBServiceDef, isbServiceRollout) // if I need to update or am in the middle of an update of the ISBService, then I need to make sure all the Pipelines are pausing - // TODO: do we need the value isbServiceNeedsUpdating still? - isbServiceNeedsUpdating, isbServiceIsUpdating, err := r.isISBServiceUpdating(ctx, isbServiceRollout, existingISBServiceDef) + _, isbServiceIsUpdating, err := r.isISBServiceUpdating(ctx, isbServiceRollout, existingISBServiceDef) if err != nil { return false, err } @@ -356,7 +355,7 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont switch inProgressStrategy { case apiv1.UpgradeStrategyPPND: - done, err := processChildObjectWithPPND(ctx, isbServiceRollout, r, isbServiceNeedsUpdating, isbServiceIsUpdating, func() error { + done, err := processChildObjectWithPPND(ctx, isbServiceRollout, r, isbServiceNeedsToUpdate, isbServiceIsUpdating, func() error { r.recorder.Eventf(isbServiceRollout, corev1.EventTypeNormal, "PipelinesPaused", "All Pipelines have paused for ISBService update") err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef) if err != nil { @@ -370,10 +369,12 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont } if done { r.inProgressStrategyMgr.unsetStrategy(ctx, isbServiceRollout) - return done, nil + } else { + // requeue if done with PPND is false + return true, nil } case apiv1.UpgradeStrategyNoOp: - if isbServiceNeedsUpdating { + if isbServiceNeedsToUpdate { // update ISBService err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef) if err != nil { @@ -381,7 +382,6 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont } r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerISBSVCRollout, "update").Observe(time.Since(syncStartTime).Seconds()) } - // r.inProgressStrategyMgr.unsetStrategy(ctx, isbServiceRollout) case apiv1.UpgradeStrategyProgressive: return false, errors.New("Progressive Strategy not supported yet") default: diff --git a/internal/controller/isbservicerollout_controller_test.go b/internal/controller/isbservicerollout_controller_test.go index bd6a2237..fac6c411 100644 --- a/internal/controller/isbservicerollout_controller_test.go +++ b/internal/controller/isbservicerollout_controller_test.go @@ -316,7 +316,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { expectedRolloutPhase: apiv1.PhasePending, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{apiv1.ConditionPausingPipelines: metav1.ConditionTrue}, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), - expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { name: "existing ISBService - new spec - pipelines paused", @@ -331,7 +331,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { name: "existing ISBService - new spec - pipelines failed", @@ -345,7 +345,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { name: "existing ISBService - spec already updated - isbsvc reconciling", @@ -359,7 +359,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { apiv1.ConditionPausingPipelines: metav1.ConditionTrue, }, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { name: "existing ISBService - spec already updated - isbsvc done reconciling", diff --git a/internal/controller/numaflowcontrollerrollout_controller.go b/internal/controller/numaflowcontrollerrollout_controller.go index cd44da23..4357855a 100644 --- a/internal/controller/numaflowcontrollerrollout_controller.go +++ b/internal/controller/numaflowcontrollerrollout_controller.go @@ -292,7 +292,7 @@ func (r *NumaflowControllerRolloutReconciler) reconcile( controllerRollout.Status.MarkDeployed(controllerRollout.Generation) } - needsRequeue, err := processChildObjectWithPPND(ctx, controllerRollout, r, controllerDeploymentNeedsUpdating, + done, err := processChildObjectWithPPND(ctx, controllerRollout, r, controllerDeploymentNeedsUpdating, controllerDeploymentIsUpdating, func() error { r.recorder.Eventf(controllerRollout, corev1.EventTypeNormal, "AllPipelinesPaused", "All Pipelines have paused so Numaflow Controller can safely update") phase, err := r.sync(controllerRollout, namespace, numaLogger) @@ -309,7 +309,7 @@ func (r *NumaflowControllerRolloutReconciler) reconcile( if err != nil { return ctrl.Result{}, err } - if needsRequeue { + if !done { return common.DefaultDelayedRequeue, nil } diff --git a/internal/controller/pause_requester.go b/internal/controller/pause_requester.go index 502459b0..4cb294fb 100644 --- a/internal/controller/pause_requester.go +++ b/internal/controller/pause_requester.go @@ -26,7 +26,7 @@ type PauseRequester interface { // process a child object, pausing pipelines or resuming pipelines if needed // return: -// - true if needs a requeue +// - true if done with PPND // - error if any (note we'll automatically reuqueue if there's an error anyway) func processChildObjectWithPPND(ctx context.Context, rollout client.Object, pauseRequester PauseRequester, resourceNeedsUpdating bool, resourceIsUpdating bool, updateFunc func() error) (bool, error) { @@ -64,7 +64,7 @@ func processChildObjectWithPPND(ctx context.Context, rollout client.Object, paus } } - return true, nil + return false, nil } else { // remove any pause requirement if necessary @@ -74,7 +74,7 @@ func processChildObjectWithPPND(ctx context.Context, rollout client.Object, paus } } - return false, nil + return true, nil } // request that the Pipelines corresponding to this Rollout pause From f601571a37fd0ef12a6012bb2991faee34ccf2e7 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Fri, 4 Oct 2024 14:25:34 -0700 Subject: [PATCH 14/23] test: remove comments and add check that pl never pauses Signed-off-by: Dillen Padhiar --- tests/e2e/functional_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index cafc567e..7b1e6538 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -557,7 +557,13 @@ var _ = Describe("Functional e2e", Serial, func() { document("Verify that in-progress-strategy gets set to NoOp") verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyNoOp) verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp) - // verifyPipelinePaused(Namespace, pipelineRolloutName, pipelineName) + + if dataLossPrevention == "true" { + document("Verify that dependent Pipeline is not paused when an update to ISBService not requiring pause is made") + verifyPipelineStatusConsistently(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec, retrievedPipelineStatus numaflowv1.PipelineStatus) bool { + return retrievedPipelineStatus.Phase != numaflowv1.PipelinePhasePaused + }) + } Eventually(func() bool { isbRollout, _ := isbServiceRolloutClient.Get(ctx, isbServiceRolloutName, metav1.GetOptions{}) @@ -572,7 +578,6 @@ var _ = Describe("Functional e2e", Serial, func() { } verifyISBServiceSpec(Namespace, isbServiceRolloutName, func(retrievedISBServiceSpec numaflowv1.InterStepBufferServiceSpec) bool { - // return retrievedISBServiceSpec.JetStream.Version == "2.9.8" return *retrievedISBServiceSpec.JetStream.ContainerTemplate.Resources.Limits.Memory() == memLimit }) From b8c843b9b5f7a73cd02214415d2b984b86672094 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Fri, 4 Oct 2024 14:54:38 -0700 Subject: [PATCH 15/23] fix: remove extra variable check in test case Signed-off-by: Dillen Padhiar --- tests/e2e/functional_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index 7b1e6538..f2bc81ea 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -558,12 +558,10 @@ var _ = Describe("Functional e2e", Serial, func() { verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyNoOp) verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp) - if dataLossPrevention == "true" { - document("Verify that dependent Pipeline is not paused when an update to ISBService not requiring pause is made") - verifyPipelineStatusConsistently(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec, retrievedPipelineStatus numaflowv1.PipelineStatus) bool { - return retrievedPipelineStatus.Phase != numaflowv1.PipelinePhasePaused - }) - } + document("Verify that dependent Pipeline is not paused when an update to ISBService not requiring pause is made") + verifyPipelineStatusConsistently(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec, retrievedPipelineStatus numaflowv1.PipelineStatus) bool { + return retrievedPipelineStatus.Phase != numaflowv1.PipelinePhasePaused + }) Eventually(func() bool { isbRollout, _ := isbServiceRolloutClient.Get(ctx, isbServiceRolloutName, metav1.GetOptions{}) From a20a315ecc04e87b0b4d7eeac720c981447d3bc5 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Fri, 4 Oct 2024 16:04:30 -0700 Subject: [PATCH 16/23] test: address review comments Signed-off-by: Dillen Padhiar --- internal/controller/isbservicerollout_controller.go | 2 +- tests/e2e/functional_test.go | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/internal/controller/isbservicerollout_controller.go b/internal/controller/isbservicerollout_controller.go index 170717ad..251f9c91 100644 --- a/internal/controller/isbservicerollout_controller.go +++ b/internal/controller/isbservicerollout_controller.go @@ -301,7 +301,7 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont // update our Status with the ISBService's Status r.processISBServiceStatus(ctx, existingISBServiceDef, isbServiceRollout) - // if I need to update or am in the middle of an update of the ISBService, then I need to make sure all the Pipelines are pausing + // if I am in the middle of an update of the ISBService, then I need to make sure all the Pipelines are pausing _, isbServiceIsUpdating, err := r.isISBServiceUpdating(ctx, isbServiceRollout, existingISBServiceDef) if err != nil { return false, err diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index f2bc81ea..699c5773 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -554,16 +554,12 @@ var _ = Describe("Functional e2e", Serial, func() { if dataLossPrevention == "true" { - document("Verify that in-progress-strategy gets set to NoOp") - verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyNoOp) - verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp) - document("Verify that dependent Pipeline is not paused when an update to ISBService not requiring pause is made") verifyPipelineStatusConsistently(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec, retrievedPipelineStatus numaflowv1.PipelineStatus) bool { return retrievedPipelineStatus.Phase != numaflowv1.PipelinePhasePaused }) - Eventually(func() bool { + Consistently(func() bool { isbRollout, _ := isbServiceRolloutClient.Get(ctx, isbServiceRolloutName, metav1.GetOptions{}) isbCondStatus := getRolloutCondition(isbRollout.Status.Conditions, apiv1.ConditionPausingPipelines) plRollout, _ := pipelineRolloutClient.Get(ctx, pipelineRolloutName, metav1.GetOptions{}) @@ -572,7 +568,7 @@ var _ = Describe("Functional e2e", Serial, func() { return false } return true - }, testTimeout).Should(BeTrue()) + }, "30s").Should(BeTrue()) } verifyISBServiceSpec(Namespace, isbServiceRolloutName, func(retrievedISBServiceSpec numaflowv1.InterStepBufferServiceSpec) bool { From d0ea107f72a337476ac71784fe53faec9aaa070d Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Fri, 4 Oct 2024 16:17:41 -0700 Subject: [PATCH 17/23] fix: increase Consistently time Signed-off-by: Dillen Padhiar --- tests/e2e/functional_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index 699c5773..3ea86d41 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -568,7 +568,7 @@ var _ = Describe("Functional e2e", Serial, func() { return false } return true - }, "30s").Should(BeTrue()) + }, testTimeout).Should(BeTrue()) } verifyISBServiceSpec(Namespace, isbServiceRolloutName, func(retrievedISBServiceSpec numaflowv1.InterStepBufferServiceSpec) bool { From 12491c870cbf3b2b2766636e73ef1b17e8bad4c7 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 7 Oct 2024 13:34:15 -0700 Subject: [PATCH 18/23] fix: update func test for usde bug Signed-off-by: Dillen Padhiar --- tests/e2e/functional_test.go | 41 +++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index 3ea86d41..c2cd9006 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -126,7 +126,7 @@ var ( } volSize, _ = apiresource.ParseQuantity("10Mi") - memLimit, _ = apiresource.ParseQuantity("10Mi") + memLimit, _ = apiresource.ParseQuantity("20Mi") isbServiceSpec = numaflowv1.InterStepBufferServiceSpec{ Redis: nil, JetStream: &numaflowv1.JetStreamBufferService{ @@ -134,6 +134,11 @@ var ( Persistence: &numaflowv1.PersistenceStrategy{ VolumeSize: &volSize, }, + ContainerTemplate: &numaflowv1.ContainerTemplate{ + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{v1.ResourceMemory: volSize}, + }, + }, }, } ISBServiceSpecExcludedField = numaflowv1.InterStepBufferServiceSpec{ @@ -526,6 +531,7 @@ var _ = Describe("Functional e2e", Serial, func() { } return true }, testTimeout).Should(BeTrue()) + } verifyISBServiceSpec(Namespace, isbServiceRolloutName, func(retrievedISBServiceSpec numaflowv1.InterStepBufferServiceSpec) bool { @@ -552,25 +558,30 @@ var _ = Describe("Functional e2e", Serial, func() { return rollout, nil }) - if dataLossPrevention == "true" { - - document("Verify that dependent Pipeline is not paused when an update to ISBService not requiring pause is made") + document("Verify that dependent Pipeline is not paused when an update to ISBService not requiring pause is made") + // verifyPipelineStatusConsistently(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec, retrievedPipelineStatus numaflowv1.PipelineStatus) bool { + // return retrievedPipelineStatus.Phase != numaflowv1.PipelinePhasePaused + // }) + verifyNotPausing := func() bool { + verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyNoOp) + verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp) verifyPipelineStatusConsistently(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec, retrievedPipelineStatus numaflowv1.PipelineStatus) bool { return retrievedPipelineStatus.Phase != numaflowv1.PipelinePhasePaused }) - - Consistently(func() bool { - isbRollout, _ := isbServiceRolloutClient.Get(ctx, isbServiceRolloutName, metav1.GetOptions{}) - isbCondStatus := getRolloutCondition(isbRollout.Status.Conditions, apiv1.ConditionPausingPipelines) - plRollout, _ := pipelineRolloutClient.Get(ctx, pipelineRolloutName, metav1.GetOptions{}) - plCondStatus := getRolloutCondition(plRollout.Status.Conditions, apiv1.ConditionPipelinePausingOrPaused) - if isbCondStatus == metav1.ConditionTrue || plCondStatus == metav1.ConditionTrue { - return false - } - return true - }, testTimeout).Should(BeTrue()) + isbRollout, _ := isbServiceRolloutClient.Get(ctx, isbServiceRolloutName, metav1.GetOptions{}) + isbCondStatus := getRolloutCondition(isbRollout.Status.Conditions, apiv1.ConditionPausingPipelines) + plRollout, _ := pipelineRolloutClient.Get(ctx, pipelineRolloutName, metav1.GetOptions{}) + plCondStatus := getRolloutCondition(plRollout.Status.Conditions, apiv1.ConditionPipelinePausingOrPaused) + if isbCondStatus == metav1.ConditionTrue || plCondStatus == metav1.ConditionTrue { + return false + } + return true } + Eventually(verifyNotPausing, testTimeout).Should(BeTrue()) + + Consistently(verifyNotPausing, 30*time.Second).Should(BeTrue()) + verifyISBServiceSpec(Namespace, isbServiceRolloutName, func(retrievedISBServiceSpec numaflowv1.InterStepBufferServiceSpec) bool { return *retrievedISBServiceSpec.JetStream.ContainerTemplate.Resources.Limits.Memory() == memLimit }) From c41e7d26f4afedb82a13efa27ad03dc616802585 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 7 Oct 2024 13:56:56 -0700 Subject: [PATCH 19/23] fix: merge main Signed-off-by: Dillen Padhiar --- .../isbservicerollout_controller_test.go | 77 ++++--------------- internal/usde/usde_test.go | 67 ++++++++++++++++ 2 files changed, 80 insertions(+), 64 deletions(-) diff --git a/internal/controller/isbservicerollout_controller_test.go b/internal/controller/isbservicerollout_controller_test.go index 55f83680..6e905e5c 100644 --- a/internal/controller/isbservicerollout_controller_test.go +++ b/internal/controller/isbservicerollout_controller_test.go @@ -268,19 +268,12 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { ppndUpgradeStrategy := apiv1.UpgradeStrategyPPND testCases := []struct { - name string - newISBSvcSpec numaflowv1.InterStepBufferServiceSpec - existingISBSvcDef *numaflowv1.InterStepBufferService - existingStatefulSetDef *appsv1.StatefulSet - existingPipeline *numaflowv1.Pipeline - expectedRolloutPhase apiv1.Phase - name string - newISBSvcSpec numaflowv1.InterStepBufferServiceSpec - - existingInProgressStrategy *apiv1.UpgradeStrategy + name string + newISBSvcSpec numaflowv1.InterStepBufferServiceSpec existingISBSvcDef *numaflowv1.InterStepBufferService existingStatefulSetDef *appsv1.StatefulSet - existingPipelinePhase numaflowv1.PipelinePhase + existingPipeline *numaflowv1.Pipeline + existingInProgressStrategy *apiv1.UpgradeStrategy expectedRolloutPhase apiv1.Phase // require these Conditions to be set (note that in real life, previous reconciliations may have set other Conditions from before which are still present) expectedConditionsSet map[apiv1.ConditionType]metav1.ConditionStatus @@ -294,13 +287,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { existingStatefulSetDef: nil, existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning, map[string]string{}), expectedRolloutPhase: apiv1.PhaseDeployed, - name: "new ISBService", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), - existingISBSvcDef: nil, - existingStatefulSetDef: nil, - existingPipelinePhase: numaflowv1.PipelinePhaseRunning, existingInProgressStrategy: nil, - expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, @@ -316,15 +303,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{}, // some Conditions may be set from before, but in any case nothing new to verify expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), - name: "existing ISBService - no change", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipelinePhase: numaflowv1.PipelinePhaseRunning, existingInProgressStrategy: nil, - expectedRolloutPhase: apiv1.PhaseDeployed, - expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{}, // some Conditions may be set from before, but in any case nothing new to verify - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp, }, { @@ -336,15 +315,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { expectedRolloutPhase: apiv1.PhasePending, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{apiv1.ConditionPausingPipelines: metav1.ConditionTrue}, expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), - name: "existing ISBService - new spec - pipelines not paused", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipelinePhase: numaflowv1.PipelinePhaseRunning, existingInProgressStrategy: nil, - expectedRolloutPhase: apiv1.PhasePending, - expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{apiv1.ConditionPausingPipelines: metav1.ConditionTrue}, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"), expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { @@ -354,13 +325,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}), expectedRolloutPhase: apiv1.PhaseDeployed, - name: "existing ISBService - new spec - pipelines paused", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipelinePhase: numaflowv1.PipelinePhasePaused, existingInProgressStrategy: &ppndUpgradeStrategy, - expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionPausingPipelines: metav1.ConditionTrue, apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, @@ -369,16 +334,18 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { - name: "existing ISBService - new spec - pipelines failed", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseFailed, map[string]string{}), - expectedRolloutPhase: apiv1.PhaseDeployed, + name: "existing ISBService - new spec - pipelines failed", + newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), + existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), + existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseFailed, map[string]string{}), + existingInProgressStrategy: &ppndUpgradeStrategy, + expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, - expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), + expectedInProgressStrategy: apiv1.UpgradeStrategyPPND, }, { name: "existing ISBService - new spec - pipelines set to allow data loss", @@ -387,13 +354,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePausing, map[string]string{common.LabelKeyAllowDataLoss: "true"}), expectedRolloutPhase: apiv1.PhaseDeployed, - name: "existing ISBService - new spec - pipelines failed", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true), - existingPipelinePhase: numaflowv1.PipelinePhaseFailed, existingInProgressStrategy: nil, - expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, @@ -407,13 +368,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", false), existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}), expectedRolloutPhase: apiv1.PhaseDeployed, - name: "existing ISBService - spec already updated - isbsvc reconciling", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, false), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", false), - existingPipelinePhase: numaflowv1.PipelinePhasePaused, existingInProgressStrategy: &ppndUpgradeStrategy, - expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionPausingPipelines: metav1.ConditionTrue, }, @@ -427,13 +382,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) { existingStatefulSetDef: createDefaultISBStatefulSet("2.10.11", true), existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}), expectedRolloutPhase: apiv1.PhaseDeployed, - name: "existing ISBService - spec already updated - isbsvc done reconciling", - newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"), - existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, true), - existingStatefulSetDef: createDefaultISBStatefulSet("2.10.11", true), - existingPipelinePhase: numaflowv1.PipelinePhasePaused, existingInProgressStrategy: nil, - expectedRolloutPhase: apiv1.PhaseDeployed, expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{ apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue, }, diff --git a/internal/usde/usde_test.go b/internal/usde/usde_test.go index 5d6b5d04..14603aeb 100644 --- a/internal/usde/usde_test.go +++ b/internal/usde/usde_test.go @@ -9,11 +9,13 @@ import ( "github.com/numaproj/numaplane/internal/controller/config" "github.com/numaproj/numaplane/internal/util/kubernetes" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" + apiresource "k8s.io/apimachinery/pkg/api/resource" ) const defaultNamespace = "default" @@ -61,6 +63,24 @@ var defaultPipelineSpec = numaflowv1.PipelineSpec{ }, } +var volSize, _ = apiresource.ParseQuantity("10Mi") +var memLimit, _ = apiresource.ParseQuantity("10Mi") +var newMemLimit, _ = apiresource.ParseQuantity("20Mi") +var defaultISBServiceSpec = numaflowv1.InterStepBufferServiceSpec{ + Redis: nil, + JetStream: &numaflowv1.JetStreamBufferService{ + Version: "2.9.6", + Persistence: &numaflowv1.PersistenceStrategy{ + VolumeSize: &volSize, + }, + // ContainerTemplate: &numaflowv1.ContainerTemplate{ + // Resources: v1.ResourceRequirements{ + // Limits: v1.ResourceList{v1.ResourceMemory: memLimit}, + // }, + // }, + }, +} + func makePipelineDefinition(pipelineSpec numaflowv1.PipelineSpec) kubernetes.GenericObject { pipelineSpecRaw, _ := json.Marshal(pipelineSpec) @@ -85,12 +105,38 @@ func makePipelineDefinition(pipelineSpec numaflowv1.PipelineSpec) kubernetes.Gen } } +func makeISBServiceDefinition(isbServiceSpec numaflowv1.InterStepBufferServiceSpec) kubernetes.GenericObject { + isbServiceSpecRaw, _ := json.Marshal(isbServiceSpec) + + isbrs := apiv1.ISBServiceRolloutSpec{ + InterStepBufferService: apiv1.InterStepBufferService{ + Spec: runtime.RawExtension{ + Raw: isbServiceSpecRaw, + }, + }, + } + + return kubernetes.GenericObject{ + TypeMeta: metav1.TypeMeta{ + Kind: "InterStepBufferService", + APIVersion: "numaflow.numaproj.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-isbsvc", + Namespace: defaultNamespace, + }, + Spec: isbrs.InterStepBufferService.Spec, + } + +} + func Test_ResourceNeedsUpdating(t *testing.T) { ctx := context.Background() configManager := config.GetConfigManagerInstance() pipelineDefn := makePipelineDefinition(defaultPipelineSpec) + isbServiceDefn := makeISBServiceDefinition(defaultISBServiceSpec) testCases := []struct { name string @@ -306,6 +352,27 @@ func Test_ResourceNeedsUpdating(t *testing.T) { expectedNeedsUpdating: false, expectedStrategy: apiv1.UpgradeStrategyNoOp, }, + { + name: "isb test", + newSpec: isbServiceDefn, + existingSpec: func() kubernetes.GenericObject { + newISBServiceSpec := defaultISBServiceSpec.DeepCopy() + newISBServiceSpec.JetStream.ContainerTemplate = &numaflowv1.ContainerTemplate{ + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{v1.ResourceMemory: memLimit}, + }, + } + return makeISBServiceDefinition(*newISBServiceSpec) + }(), + usdeConfig: config.USDEConfig{ + DefaultUpgradeStrategy: config.PPNDStrategyID, + PipelineSpecExcludedPaths: []string{"vertices.source.something"}, + ISBServiceSpecExcludedPaths: []string{"jetstream.containerTemplate.resources.limits"}, + }, + namespaceConfig: &config.NamespaceConfig{UpgradeStrategy: "pause-and-drain"}, + expectedNeedsUpdating: true, + expectedStrategy: apiv1.UpgradeStrategyApply, + }, } for _, tc := range testCases { From 9ed54b25f6cd91a939fc2da0250ccc46baade654 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 7 Oct 2024 13:57:39 -0700 Subject: [PATCH 20/23] chore: comment out usde case Signed-off-by: Dillen Padhiar --- internal/usde/usde_test.go | 45 +++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/internal/usde/usde_test.go b/internal/usde/usde_test.go index 14603aeb..f4b3a31a 100644 --- a/internal/usde/usde_test.go +++ b/internal/usde/usde_test.go @@ -9,7 +9,6 @@ import ( "github.com/numaproj/numaplane/internal/controller/config" "github.com/numaproj/numaplane/internal/util/kubernetes" "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -136,7 +135,7 @@ func Test_ResourceNeedsUpdating(t *testing.T) { configManager := config.GetConfigManagerInstance() pipelineDefn := makePipelineDefinition(defaultPipelineSpec) - isbServiceDefn := makeISBServiceDefinition(defaultISBServiceSpec) + // isbServiceDefn := makeISBServiceDefinition(defaultISBServiceSpec) testCases := []struct { name string @@ -352,27 +351,27 @@ func Test_ResourceNeedsUpdating(t *testing.T) { expectedNeedsUpdating: false, expectedStrategy: apiv1.UpgradeStrategyNoOp, }, - { - name: "isb test", - newSpec: isbServiceDefn, - existingSpec: func() kubernetes.GenericObject { - newISBServiceSpec := defaultISBServiceSpec.DeepCopy() - newISBServiceSpec.JetStream.ContainerTemplate = &numaflowv1.ContainerTemplate{ - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{v1.ResourceMemory: memLimit}, - }, - } - return makeISBServiceDefinition(*newISBServiceSpec) - }(), - usdeConfig: config.USDEConfig{ - DefaultUpgradeStrategy: config.PPNDStrategyID, - PipelineSpecExcludedPaths: []string{"vertices.source.something"}, - ISBServiceSpecExcludedPaths: []string{"jetstream.containerTemplate.resources.limits"}, - }, - namespaceConfig: &config.NamespaceConfig{UpgradeStrategy: "pause-and-drain"}, - expectedNeedsUpdating: true, - expectedStrategy: apiv1.UpgradeStrategyApply, - }, + // { + // name: "isb test", + // newSpec: isbServiceDefn, + // existingSpec: func() kubernetes.GenericObject { + // newISBServiceSpec := defaultISBServiceSpec.DeepCopy() + // newISBServiceSpec.JetStream.ContainerTemplate = &numaflowv1.ContainerTemplate{ + // Resources: v1.ResourceRequirements{ + // Limits: v1.ResourceList{v1.ResourceMemory: memLimit}, + // }, + // } + // return makeISBServiceDefinition(*newISBServiceSpec) + // }(), + // usdeConfig: config.USDEConfig{ + // DefaultUpgradeStrategy: config.PPNDStrategyID, + // PipelineSpecExcludedPaths: []string{"vertices.source.something"}, + // ISBServiceSpecExcludedPaths: []string{"jetstream.containerTemplate.resources.limits"}, + // }, + // namespaceConfig: &config.NamespaceConfig{UpgradeStrategy: "pause-and-drain"}, + // expectedNeedsUpdating: true, + // expectedStrategy: apiv1.UpgradeStrategyApply, + // }, } for _, tc := range testCases { From 9c03871cdb039632522afe5ab46146de0e8145c9 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 7 Oct 2024 14:14:25 -0700 Subject: [PATCH 21/23] chore: usde test Signed-off-by: Dillen Padhiar --- internal/usde/usde_test.go | 51 ++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/internal/usde/usde_test.go b/internal/usde/usde_test.go index f4b3a31a..afcd2507 100644 --- a/internal/usde/usde_test.go +++ b/internal/usde/usde_test.go @@ -9,6 +9,7 @@ import ( "github.com/numaproj/numaplane/internal/controller/config" "github.com/numaproj/numaplane/internal/util/kubernetes" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -72,11 +73,11 @@ var defaultISBServiceSpec = numaflowv1.InterStepBufferServiceSpec{ Persistence: &numaflowv1.PersistenceStrategy{ VolumeSize: &volSize, }, - // ContainerTemplate: &numaflowv1.ContainerTemplate{ - // Resources: v1.ResourceRequirements{ - // Limits: v1.ResourceList{v1.ResourceMemory: memLimit}, - // }, - // }, + ContainerTemplate: &numaflowv1.ContainerTemplate{ + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{v1.ResourceMemory: memLimit}, + }, + }, }, } @@ -135,7 +136,7 @@ func Test_ResourceNeedsUpdating(t *testing.T) { configManager := config.GetConfigManagerInstance() pipelineDefn := makePipelineDefinition(defaultPipelineSpec) - // isbServiceDefn := makeISBServiceDefinition(defaultISBServiceSpec) + isbServiceDefn := makeISBServiceDefinition(defaultISBServiceSpec) testCases := []struct { name string @@ -351,27 +352,23 @@ func Test_ResourceNeedsUpdating(t *testing.T) { expectedNeedsUpdating: false, expectedStrategy: apiv1.UpgradeStrategyNoOp, }, - // { - // name: "isb test", - // newSpec: isbServiceDefn, - // existingSpec: func() kubernetes.GenericObject { - // newISBServiceSpec := defaultISBServiceSpec.DeepCopy() - // newISBServiceSpec.JetStream.ContainerTemplate = &numaflowv1.ContainerTemplate{ - // Resources: v1.ResourceRequirements{ - // Limits: v1.ResourceList{v1.ResourceMemory: memLimit}, - // }, - // } - // return makeISBServiceDefinition(*newISBServiceSpec) - // }(), - // usdeConfig: config.USDEConfig{ - // DefaultUpgradeStrategy: config.PPNDStrategyID, - // PipelineSpecExcludedPaths: []string{"vertices.source.something"}, - // ISBServiceSpecExcludedPaths: []string{"jetstream.containerTemplate.resources.limits"}, - // }, - // namespaceConfig: &config.NamespaceConfig{UpgradeStrategy: "pause-and-drain"}, - // expectedNeedsUpdating: true, - // expectedStrategy: apiv1.UpgradeStrategyApply, - // }, + { + name: "isb test", + newSpec: isbServiceDefn, + existingSpec: func() kubernetes.GenericObject { + newISBServiceSpec := defaultISBServiceSpec.DeepCopy() + newISBServiceSpec.JetStream.ContainerTemplate.Resources.Limits = v1.ResourceList{v1.ResourceMemory: newMemLimit} + return makeISBServiceDefinition(*newISBServiceSpec) + }(), + usdeConfig: config.USDEConfig{ + DefaultUpgradeStrategy: config.PPNDStrategyID, + PipelineSpecExcludedPaths: []string{"vertices.source.something"}, + ISBServiceSpecExcludedPaths: []string{"jetstream.containerTemplate.resources.limits"}, + }, + namespaceConfig: &config.NamespaceConfig{UpgradeStrategy: "pause-and-drain"}, + expectedNeedsUpdating: true, + expectedStrategy: apiv1.UpgradeStrategyApply, + }, } for _, tc := range testCases { From e7724f1d0844a7aee540b2b6b3ec44ea082197b5 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 7 Oct 2024 16:06:39 -0700 Subject: [PATCH 22/23] chore: remove comment Signed-off-by: Dillen Padhiar --- tests/e2e/functional_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index c2cd9006..678084d3 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -559,9 +559,6 @@ var _ = Describe("Functional e2e", Serial, func() { }) document("Verify that dependent Pipeline is not paused when an update to ISBService not requiring pause is made") - // verifyPipelineStatusConsistently(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec, retrievedPipelineStatus numaflowv1.PipelineStatus) bool { - // return retrievedPipelineStatus.Phase != numaflowv1.PipelinePhasePaused - // }) verifyNotPausing := func() bool { verifyInProgressStrategyISBService(Namespace, isbServiceRolloutName, apiv1.UpgradeStrategyNoOp) verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp) From dcdd03e816f2a68bf7aa2a0d8f68e9874b0c27bf Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Tue, 8 Oct 2024 10:43:49 -0700 Subject: [PATCH 23/23] test: remove eventually block Signed-off-by: Dillen Padhiar --- tests/e2e/functional_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index 678084d3..5f758d88 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -575,8 +575,6 @@ var _ = Describe("Functional e2e", Serial, func() { return true } - Eventually(verifyNotPausing, testTimeout).Should(BeTrue()) - Consistently(verifyNotPausing, 30*time.Second).Should(BeTrue()) verifyISBServiceSpec(Namespace, isbServiceRolloutName, func(retrievedISBServiceSpec numaflowv1.InterStepBufferServiceSpec) bool {