From 2a0cbd51ead0e934d8513f0bf7d3e101edd0efa5 Mon Sep 17 00:00:00 2001 From: Hao Hao Date: Wed, 2 Oct 2024 16:06:47 -0700 Subject: [PATCH 1/2] add pipeline no down time upgrade Signed-off-by: Hao Hao --- .../controller/pipelinerollout_controller.go | 56 +++++---- .../pipelinerollout_controller_test.go | 10 +- .../controller/pipelinerollout_progressive.go | 110 ++++++++++++++++++ 3 files changed, 152 insertions(+), 24 deletions(-) create mode 100644 internal/controller/pipelinerollout_progressive.go diff --git a/internal/controller/pipelinerollout_controller.go b/internal/controller/pipelinerollout_controller.go index bd5f6b33..f342e17b 100644 --- a/internal/controller/pipelinerollout_controller.go +++ b/internal/controller/pipelinerollout_controller.go @@ -374,7 +374,7 @@ func (r *PipelineRolloutReconciler) reconcile( controllerutil.AddFinalizer(pipelineRollout, finalizerName) } - newPipelineDef, err := r.makePipelineDefinition(ctx, pipelineRollout) + newPipelineDef, err := r.makeRunningPipelineDefinition(ctx, pipelineRollout) if err != nil { return false, err } @@ -506,15 +506,13 @@ func (r *PipelineRolloutReconciler) processExistingPipeline(ctx context.Context, case apiv1.UpgradeStrategyProgressive: if pipelineNeedsToUpdate { - numaLogger.Error(errors.New("Progressive not supported"), "Progressive not supported") - //TODO (just an idea of what it might look like below...) - // done, err := r.processExistingPipelineWithProgressive(...) - //if err != nil { - // return err - //} - //if done { - // r.unsetInProgressStrategy(namespacedName) - //} + done, err := r.processExistingPipelineWithProgressive(ctx, pipelineRollout, newPipelineDef, pipelineNeedsToUpdate) + if err != nil { + return err + } + if done { + r.inProgressStrategyMgr.unsetStrategy(ctx, pipelineRollout) + } } default: if pipelineNeedsToUpdate && upgradeStrategyType == apiv1.UpgradeStrategyApply { @@ -539,7 +537,7 @@ func pipelineObservedGenerationCurrent(generation int64, observedGeneration int6 func (r *PipelineRolloutReconciler) processPipelineStatus(ctx context.Context, pipelineRollout *apiv1.PipelineRollout) error { numaLogger := logger.FromContext(ctx) - pipelineDef, err := r.makePipelineDefinition(ctx, pipelineRollout) + pipelineDef, err := r.makeRunningPipelineDefinition(ctx, pipelineRollout) if err != nil { return err } @@ -705,7 +703,7 @@ func updatePipelineSpec(ctx context.Context, restConfig *rest.Config, obj *kuber return kubernetes.UpdateCR(ctx, restConfig, obj, "pipelines") } -func pipelineLabels(pipelineRollout *apiv1.PipelineRollout) (map[string]string, error) { +func pipelineLabels(pipelineRollout *apiv1.PipelineRollout, upgradeState string) (map[string]string, error) { var pipelineSpec PipelineSpec labelMapping := map[string]string{ common.LabelKeyISBServiceNameForPipeline: "default", @@ -718,7 +716,7 @@ func pipelineLabels(pipelineRollout *apiv1.PipelineRollout) (map[string]string, } labelMapping[common.LabelKeyPipelineRolloutForPipeline] = pipelineRollout.Name - labelMapping[common.LabelKeyUpgradeState] = string(common.LabelValueUpgradePromoted) + labelMapping[common.LabelKeyUpgradeState] = upgradeState return labelMapping, nil } @@ -732,20 +730,24 @@ func (r *PipelineRolloutReconciler) updatePipelineRolloutStatusToFailed(ctx cont } // getPipelineName retrieves the name of the current running pipeline managed by the given -// pipelineRollout through the `promoted` label. If no such pipeline exists, then it -// constructs the name by calculating the suffix and appending to the PipelineRollout name. -func (r *PipelineRolloutReconciler) getPipelineName(ctx context.Context, pipelineRollout *apiv1.PipelineRollout) (string, error) { +// pipelineRollout through the `promoted` label. Unless there is non such pipeline exist, then +// construct the name by calculate the suffix and append to the PipelineRollout name. +func (r *PipelineRolloutReconciler) getPipelineName( + ctx context.Context, + pipelineRollout *apiv1.PipelineRollout, + upgradeState string, +) (string, error) { pipelines, err := kubernetes.ListCR( ctx, r.restConfig, common.NumaflowAPIGroup, common.NumaflowAPIVersion, "pipelines", pipelineRollout.Namespace, fmt.Sprintf( "%s=%s,%s=%s", common.LabelKeyPipelineRolloutForPipeline, pipelineRollout.Name, - common.LabelKeyUpgradeState, common.LabelValueUpgradePromoted, + common.LabelKeyPipelineRolloutForPipeline, upgradeState, ), "") if err != nil { return "", err } if len(pipelines) > 1 { - return "", fmt.Errorf("there should only be one promoted pipeline") + return "", fmt.Errorf("there should only be one promoted or upgrade in progress pipeline") } else if len(pipelines) == 0 { suffixName, err := r.calPipelineNameSuffix(ctx, pipelineRollout) if err != nil { @@ -770,16 +772,28 @@ func (r *PipelineRolloutReconciler) calPipelineNameSuffix(ctx context.Context, p return "-" + fmt.Sprint(*pipelineRollout.Status.NameCount), nil } -func (r *PipelineRolloutReconciler) makePipelineDefinition(ctx context.Context, pipelineRollout *apiv1.PipelineRollout) (*kubernetes.GenericObject, error) { - labels, err := pipelineLabels(pipelineRollout) +func (r *PipelineRolloutReconciler) makeRunningPipelineDefinition( + ctx context.Context, + pipelineRollout *apiv1.PipelineRollout, +) (*kubernetes.GenericObject, error) { + pipelineName, err := r.getPipelineName(ctx, pipelineRollout, string(common.LabelValueUpgradePromoted)) if err != nil { return nil, err } - pipelineName, err := r.getPipelineName(ctx, pipelineRollout) + + labels, err := pipelineLabels(pipelineRollout, string(common.LabelValueUpgradePromoted)) if err != nil { return nil, err } + return r.makePipelineDefinition(pipelineRollout, pipelineName, labels) +} + +func (r *PipelineRolloutReconciler) makePipelineDefinition( + pipelineRollout *apiv1.PipelineRollout, + pipelineName string, + labels map[string]string, +) (*kubernetes.GenericObject, error) { return &kubernetes.GenericObject{ TypeMeta: metav1.TypeMeta{ Kind: "Pipeline", diff --git a/internal/controller/pipelinerollout_controller_test.go b/internal/controller/pipelinerollout_controller_test.go index d73ec11b..e3427cba 100644 --- a/internal/controller/pipelinerollout_controller_test.go +++ b/internal/controller/pipelinerollout_controller_test.go @@ -589,24 +589,28 @@ func TestPipelineLabels(t *testing.T) { tests := []struct { name string jsonInput string + upgradeState string expectedLabel string expectError bool }{ { name: "Valid Input", jsonInput: `{"interStepBufferServiceName": "buffer-service"}`, + upgradeState: string(common.LabelValueUpgradePromoted), expectedLabel: "buffer-service", expectError: false, }, { name: "Missing InterStepBufferServiceName", jsonInput: `{}`, + upgradeState: string(common.LabelValueUpgradePromoted), expectedLabel: "default", expectError: false, }, { name: "Invalid JSON", jsonInput: `{"interStepBufferServiceName": "buffer-service"`, + upgradeState: string(common.LabelValueUpgradeInProgress), expectedLabel: "", expectError: true, }, @@ -627,7 +631,7 @@ func TestPipelineLabels(t *testing.T) { }, } - labels, err := pipelineLabels(pipelineRollout) + labels, err := pipelineLabels(pipelineRollout, tt.upgradeState) if (err != nil) != tt.expectError { t.Errorf("pipelineLabels() error = %v, expectError %v", err, tt.expectError) return @@ -641,8 +645,8 @@ func TestPipelineLabels(t *testing.T) { t.Errorf("pipelineLabels() = %v, expected %v", common.LabelKeyPipelineRolloutForPipeline, pipelineRolloutName) } - if labels[common.LabelKeyUpgradeState] != string(common.LabelValueUpgradePromoted) { - t.Errorf("pipelineLabels() = %v, expected %v", common.LabelKeyUpgradeState, string(common.LabelValueUpgradePromoted)) + if labels[common.LabelKeyUpgradeState] != tt.upgradeState { + t.Errorf("pipelineLabels() = %v, expected %v", common.LabelKeyUpgradeState, tt.upgradeState) } } }) diff --git a/internal/controller/pipelinerollout_progressive.go b/internal/controller/pipelinerollout_progressive.go new file mode 100644 index 00000000..6f7cca6c --- /dev/null +++ b/internal/controller/pipelinerollout_progressive.go @@ -0,0 +1,110 @@ +package controller + +import ( + "context" + "fmt" + numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaplane/internal/common" + "github.com/numaproj/numaplane/internal/util/kubernetes" + "github.com/numaproj/numaplane/internal/util/logger" + apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" +) + +func (r *PipelineRolloutReconciler) processExistingPipelineWithProgressive( + ctx context.Context, pipelineRollout *apiv1.PipelineRollout, + newPipelineDef *kubernetes.GenericObject, pipelineNeedsToUpdate bool, +) (bool, error) { + numaLogger := logger.FromContext(ctx) + + newUpgradingPipelineDef, err := r.makeUpgradingPipelineDefinition(ctx, pipelineRollout) + if err != nil { + return false, err + } + + // Get the object to see if it exists + _, err = kubernetes.GetCR(ctx, r.restConfig, newUpgradingPipelineDef, "pipelines") + if err != nil { + // create object as it doesn't exist + if apierrors.IsNotFound(err) { + + //pipelineRollout.Status.MarkPending() + + numaLogger.Debugf("Upgrading Pipeline %s/%s doesn't exist so creating", newUpgradingPipelineDef.Namespace, newUpgradingPipelineDef.Name) + err = kubernetes.CreateCR(ctx, r.restConfig, newUpgradingPipelineDef, "pipelines") + if err != nil { + return false, err + } + //pipelineRollout.Status.MarkDeployed(pipelineRollout.Generation) + //r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerPipelineRollout, "create").Observe(time.Since(syncStartTime).Seconds()) + return false, nil + } + + return false, fmt.Errorf("error getting Pipeline: %v", err) + } + + err = r.processUpgradingPipelineStatus(ctx, pipelineRollout) + if err != nil { + return false, err + } + + return true, nil +} + +func (r *PipelineRolloutReconciler) makeUpgradingPipelineDefinition( + ctx context.Context, + pipelineRollout *apiv1.PipelineRollout, +) (*kubernetes.GenericObject, error) { + pipelineName, err := r.getPipelineName(ctx, pipelineRollout, string(common.LabelValueUpgradeInProgress)) + if err != nil { + return nil, err + } + + labels, err := pipelineLabels(pipelineRollout, string(common.LabelValueUpgradeInProgress)) + if err != nil { + return nil, err + } + + return r.makePipelineDefinition(pipelineRollout, pipelineName, labels) +} + +func (r *PipelineRolloutReconciler) processUpgradingPipelineStatus( + ctx context.Context, + pipelineRollout *apiv1.PipelineRollout, +) error { + numaLogger := logger.FromContext(ctx) + + pipelineDef, err := r.makeUpgradingPipelineDefinition(ctx, pipelineRollout) + if err != nil { + return err + } + + // Get existing upgrading Pipeline + existingUpgradingPipelineDef, err := kubernetes.GetCR(ctx, r.restConfig, pipelineDef, "pipelines") + if err != nil { + if apierrors.IsNotFound(err) { + numaLogger.WithValues("pipelineDefinition", *pipelineDef).Warn("Pipeline not found. Unable to process status during this reconciliation.") + } else { + return fmt.Errorf("error getting Pipeline for status processing: %v", err) + } + } + + pipelineStatus, err := kubernetes.ParseStatus(existingUpgradingPipelineDef) + if err != nil { + return fmt.Errorf("failed to parse Pipeline Status from pipeline CR: %+v, %v", existingUpgradingPipelineDef, err) + } + + pipelinePhase := numaflowv1.PipelinePhase(pipelineStatus.Phase) + if pipelinePhase == numaflowv1.PipelinePhaseFailed { + // pipelineRO.status = "PROGRESSIVE failed" + } else if pipelinePhase == numaflowv1.PipelinePhaseRunning { + // TODO: label the new pipeline as promoted + // pause pipeline_old + } else { + // TODO: ensure the latest pipeline spec is applied + // apply pipeline_new + //continue (re-enqueue) + } + + return nil +} From 0ce7f869ddd3d2b1ea9aef63fe8c27f7cf612f05 Mon Sep 17 00:00:00 2001 From: Hao Hao Date: Sun, 6 Oct 2024 23:10:26 -0700 Subject: [PATCH 2/2] pipeline no down time Signed-off-by: Hao Hao --- .../controller/pipelinerollout_controller.go | 8 +- .../pipelinerollout_controller_test.go | 112 ++++++++++++++++++ .../controller/pipelinerollout_progressive.go | 83 +++++++++---- .../v1alpha1/pipelinerollout_types.go | 12 ++ 4 files changed, 191 insertions(+), 24 deletions(-) diff --git a/internal/controller/pipelinerollout_controller.go b/internal/controller/pipelinerollout_controller.go index f342e17b..c0d3c16e 100644 --- a/internal/controller/pipelinerollout_controller.go +++ b/internal/controller/pipelinerollout_controller.go @@ -506,7 +506,7 @@ func (r *PipelineRolloutReconciler) processExistingPipeline(ctx context.Context, case apiv1.UpgradeStrategyProgressive: if pipelineNeedsToUpdate { - done, err := r.processExistingPipelineWithProgressive(ctx, pipelineRollout, newPipelineDef, pipelineNeedsToUpdate) + done, err := r.processExistingPipelineWithProgressive(ctx, pipelineRollout, existingPipelineDef) if err != nil { return err } @@ -514,6 +514,7 @@ func (r *PipelineRolloutReconciler) processExistingPipeline(ctx context.Context, r.inProgressStrategyMgr.unsetStrategy(ctx, pipelineRollout) } } + // TODO: clean up old pipeline when drained default: if pipelineNeedsToUpdate && upgradeStrategyType == apiv1.UpgradeStrategyApply { if err := updatePipelineSpec(ctx, r.restConfig, newPipelineDef); err != nil { @@ -769,7 +770,10 @@ func (r *PipelineRolloutReconciler) calPipelineNameSuffix(ctx context.Context, p } } - return "-" + fmt.Sprint(*pipelineRollout.Status.NameCount), nil + preNameCount := *pipelineRollout.Status.NameCount + *pipelineRollout.Status.NameCount++ + + return "-" + fmt.Sprint(preNameCount), nil } func (r *PipelineRolloutReconciler) makeRunningPipelineDefinition( diff --git a/internal/controller/pipelinerollout_controller_test.go b/internal/controller/pipelinerollout_controller_test.go index e3427cba..885929f9 100644 --- a/internal/controller/pipelinerollout_controller_test.go +++ b/internal/controller/pipelinerollout_controller_test.go @@ -947,3 +947,115 @@ func Test_processExistingPipeline_PPND(t *testing.T) { }) } } + +// process an existing pipeline in this test, the user preferred strategy is Progressive +func Test_processExistingPipeline_Progressive(t *testing.T) { + restConfig, numaflowClientSet, numaplaneClient, _, err := commontest.PrepareK8SEnvironment() + assert.Nil(t, err) + + config.GetConfigManagerInstance().UpdateUSDEConfig(config.USDEConfig{ + DefaultUpgradeStrategy: config.ProgressiveStrategyID, + PipelineSpecExcludedPaths: []string{"watermark", "lifecycle"}, + }) + + ctx := context.Background() + + // other tests may call this, but it fails if called more than once + if customMetrics == nil { + customMetrics = metrics.RegisterCustomMetrics() + } + + recorder := record.NewFakeRecorder(64) + + r := NewPipelineRolloutReconciler( + numaplaneClient, + scheme.Scheme, + restConfig, + customMetrics, + recorder) + + testCases := []struct { + name string + newPipelineSpec numaflowv1.PipelineSpec + existingPipelineDef numaflowv1.Pipeline + initialRolloutPhase apiv1.Phase + initialInProgressStrategy *apiv1.UpgradeStrategy + numaflowControllerPauseRequest *bool + isbServicePauseRequest *bool + + expectedInProgressStrategy apiv1.UpgradeStrategy + expectedRolloutPhase apiv1.Phase + // require these Conditions to be set (note that in real life, previous reconciliations may have set other Conditions from before which are still present) + expectedPipelineSpecResult func(numaflowv1.PipelineSpec) bool + }{ + { + name: "spec difference results in Progressive", + newPipelineSpec: pipelineSpecWithTopologyChange, + existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning, true), + initialRolloutPhase: apiv1.PhaseDeployed, + initialInProgressStrategy: nil, + expectedInProgressStrategy: apiv1.UpgradeStrategyProgressive, + expectedRolloutPhase: apiv1.PhasePending, + expectedPipelineSpecResult: func(spec numaflowv1.PipelineSpec) bool { + return reflect.DeepEqual(pipelineWithDesiredPhase(pipelineSpec, numaflowv1.PipelinePhasePaused), spec) + }, + }, + } + + for _, tc := range testCases { + + t.Run(tc.name, func(t *testing.T) { + + // first delete Pipeline and PipelineRollout in case they already exist, in Kubernetes + _ = numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).Delete(ctx, defaultPipelineName, metav1.DeleteOptions{}) + + pipelineList, err := numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).List(ctx, metav1.ListOptions{}) + assert.NoError(t, err) + assert.Len(t, pipelineList.Items, 0) + + rollout := createPipelineRollout(tc.newPipelineSpec) + _ = numaplaneClient.Delete(ctx, rollout) + + rollout.Status.Phase = tc.initialRolloutPhase + if tc.initialInProgressStrategy != nil { + rollout.Status.UpgradeInProgress = *tc.initialInProgressStrategy + r.inProgressStrategyMgr.store.setStrategy(k8stypes.NamespacedName{Namespace: defaultNamespace, Name: defaultPipelineRolloutName}, *tc.initialInProgressStrategy) + } else { + rollout.Status.UpgradeInProgress = apiv1.UpgradeStrategyNoOp + r.inProgressStrategyMgr.store.setStrategy(k8stypes.NamespacedName{Namespace: defaultNamespace, Name: defaultPipelineRolloutName}, apiv1.UpgradeStrategyNoOp) + } + + // the Reconcile() function does this, so we need to do it before calling reconcile() as well + rollout.Status.Init(rollout.Generation) + + err = numaplaneClient.Create(ctx, rollout) + assert.NoError(t, err) + + // create the already-existing Pipeline in Kubernetes + // this updates everything but the Status subresource + existingPipelineDef := &tc.existingPipelineDef + existingPipelineDef.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(rollout.GetObjectMeta(), apiv1.PipelineRolloutGroupVersionKind)} + pipeline, err := numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).Create(ctx, existingPipelineDef, metav1.CreateOptions{}) + assert.NoError(t, err) + // update Status subresource + pipeline.Status = tc.existingPipelineDef.Status + _, err = numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).UpdateStatus(ctx, pipeline, metav1.UpdateOptions{}) + assert.NoError(t, err) + + _, err = r.reconcile(context.Background(), rollout, time.Now()) + assert.NoError(t, err) + + ////// check results: + // Check Phase of Rollout: + assert.Equal(t, tc.expectedRolloutPhase, rollout.Status.Phase) + // Check In-Progress Strategy + assert.Equal(t, tc.expectedInProgressStrategy, rollout.Status.UpgradeInProgress) + + // Check Pipeline spec + resultPipeline, err := numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).Get(ctx, defaultPipelineName, metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, resultPipeline) + assert.True(t, tc.expectedPipelineSpecResult(resultPipeline.Spec), "result spec", fmt.Sprint(resultPipeline.Spec)) + }) + } +} diff --git a/internal/controller/pipelinerollout_progressive.go b/internal/controller/pipelinerollout_progressive.go index 6f7cca6c..3279fad6 100644 --- a/internal/controller/pipelinerollout_progressive.go +++ b/internal/controller/pipelinerollout_progressive.go @@ -3,20 +3,24 @@ package controller import ( "context" "fmt" + numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/rest" + "github.com/numaproj/numaplane/internal/common" "github.com/numaproj/numaplane/internal/util/kubernetes" "github.com/numaproj/numaplane/internal/util/logger" apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1" - apierrors "k8s.io/apimachinery/pkg/api/errors" ) +// processExistingPipelineWithProgressive should be only called when determined there is +// an update required and should use progressive strategy. func (r *PipelineRolloutReconciler) processExistingPipelineWithProgressive( ctx context.Context, pipelineRollout *apiv1.PipelineRollout, - newPipelineDef *kubernetes.GenericObject, pipelineNeedsToUpdate bool, + existingPipelineDef *kubernetes.GenericObject, ) (bool, error) { numaLogger := logger.FromContext(ctx) - newUpgradingPipelineDef, err := r.makeUpgradingPipelineDefinition(ctx, pipelineRollout) if err != nil { return false, err @@ -28,27 +32,22 @@ func (r *PipelineRolloutReconciler) processExistingPipelineWithProgressive( // create object as it doesn't exist if apierrors.IsNotFound(err) { - //pipelineRollout.Status.MarkPending() - numaLogger.Debugf("Upgrading Pipeline %s/%s doesn't exist so creating", newUpgradingPipelineDef.Namespace, newUpgradingPipelineDef.Name) err = kubernetes.CreateCR(ctx, r.restConfig, newUpgradingPipelineDef, "pipelines") if err != nil { return false, err } - //pipelineRollout.Status.MarkDeployed(pipelineRollout.Generation) - //r.customMetrics.ReconciliationDuration.WithLabelValues(ControllerPipelineRollout, "create").Observe(time.Since(syncStartTime).Seconds()) - return false, nil + } else { + return false, fmt.Errorf("error getting Pipeline: %v", err) } - - return false, fmt.Errorf("error getting Pipeline: %v", err) } - err = r.processUpgradingPipelineStatus(ctx, pipelineRollout) + done, err := r.processUpgradingPipelineStatus(ctx, pipelineRollout, existingPipelineDef) if err != nil { return false, err } - return true, nil + return done, nil } func (r *PipelineRolloutReconciler) makeUpgradingPipelineDefinition( @@ -71,40 +70,80 @@ func (r *PipelineRolloutReconciler) makeUpgradingPipelineDefinition( func (r *PipelineRolloutReconciler) processUpgradingPipelineStatus( ctx context.Context, pipelineRollout *apiv1.PipelineRollout, -) error { + existingPipelineDef *kubernetes.GenericObject, +) (bool, error) { numaLogger := logger.FromContext(ctx) pipelineDef, err := r.makeUpgradingPipelineDefinition(ctx, pipelineRollout) if err != nil { - return err + return false, err } // Get existing upgrading Pipeline existingUpgradingPipelineDef, err := kubernetes.GetCR(ctx, r.restConfig, pipelineDef, "pipelines") if err != nil { if apierrors.IsNotFound(err) { - numaLogger.WithValues("pipelineDefinition", *pipelineDef).Warn("Pipeline not found. Unable to process status during this reconciliation.") + numaLogger.WithValues("pipelineDefinition", *pipelineDef). + Warn("Pipeline not found. Unable to process status during this reconciliation.") } else { - return fmt.Errorf("error getting Pipeline for status processing: %v", err) + return false, fmt.Errorf("error getting Pipeline for status processing: %v", err) } } pipelineStatus, err := kubernetes.ParseStatus(existingUpgradingPipelineDef) if err != nil { - return fmt.Errorf("failed to parse Pipeline Status from pipeline CR: %+v, %v", existingUpgradingPipelineDef, err) + return false, fmt.Errorf("failed to parse Pipeline Status from pipeline CR: %+v, %v", existingUpgradingPipelineDef, err) } pipelinePhase := numaflowv1.PipelinePhase(pipelineStatus.Phase) if pipelinePhase == numaflowv1.PipelinePhaseFailed { - // pipelineRO.status = "PROGRESSIVE failed" + pipelineRollout.Status.MarkPipelineProgressiveUpgradeFailed("New Pipeline Failed", pipelineRollout.Generation) + return false, nil } else if pipelinePhase == numaflowv1.PipelinePhaseRunning { - // TODO: label the new pipeline as promoted - // pause pipeline_old + // Label the new pipeline as promoted and then remove the label from the old pipeline, + // since per PipelineRollout is reconciled only once at a time, we do not + // need to worry about consistency issue. + err = r.updatePipelineLabel(ctx, r.restConfig, existingUpgradingPipelineDef, string(common.LabelValueUpgradePromoted)) + if err != nil { + return false, err + } + + err = r.updatePipelineLabel(ctx, r.restConfig, existingPipelineDef, "") + if err != nil { + return false, err + } + + pipelineRollout.Status.MarkPipelineProgressiveUpgradeSucceeded("New Pipeline Running", pipelineRollout.Generation) + pipelineRollout.Status.MarkDeployed(pipelineRollout.Generation) + + // TODO: pause old pipeline + return true, nil } else { - // TODO: ensure the latest pipeline spec is applied - // apply pipeline_new + // Ensure the latest pipeline spec is applied + err = kubernetes.UpdateCR(ctx, r.restConfig, pipelineDef, "pipelines") + if err != nil { + return false, err + } //continue (re-enqueue) + return false, nil } +} +func (r *PipelineRolloutReconciler) updatePipelineLabel( + ctx context.Context, + restConfig *rest.Config, + pipeline *kubernetes.GenericObject, + updateState string, +) error { + labelMapping := pipeline.Labels + labelMapping[common.LabelKeyUpgradeState] = updateState + pipeline.Labels = labelMapping + + // TODO: use patch instead + err := kubernetes.UpdateCR(ctx, restConfig, pipeline, "pipelines") + if err != nil { + return err + } return nil + } diff --git a/pkg/apis/numaplane/v1alpha1/pipelinerollout_types.go b/pkg/apis/numaplane/v1alpha1/pipelinerollout_types.go index 9f41822d..eb0ceb38 100644 --- a/pkg/apis/numaplane/v1alpha1/pipelinerollout_types.go +++ b/pkg/apis/numaplane/v1alpha1/pipelinerollout_types.go @@ -24,6 +24,9 @@ import ( const ( // ConditionPipelinePausingOrPaused indicates that the Pipeline is either pausing or paused. ConditionPipelinePausingOrPaused ConditionType = "PipelinePausingOrPaused" + + // ConditionPipelineProgressiveUpgradeSucceeded indicates that whether the progressive upgrade for the Pipeline succeeded. + ConditionPipelineProgressiveUpgradeSucceeded ConditionType = "ConditionPipelineProgressiveUpgradeSucceed" ) // PipelineRolloutSpec defines the desired state of PipelineRollout @@ -50,6 +53,7 @@ type PipelineRolloutStatus struct { } type UpgradeStrategy string +type UpgradeState string const ( UpgradeStrategyNoOp UpgradeStrategy = "" @@ -95,6 +99,14 @@ func (status *PipelineRolloutStatus) MarkPipelineUnpaused(generation int64) { status.MarkFalse(ConditionPipelinePausingOrPaused, "Unpaused", "Pipeline unpaused", generation) } +func (status *PipelineRolloutStatus) MarkPipelineProgressiveUpgradeSucceeded(message string, generation int64) { + status.MarkTrueWithReason(ConditionPipelineProgressiveUpgradeSucceeded, "Succeeded", message, generation) +} + +func (status *PipelineRolloutStatus) MarkPipelineProgressiveUpgradeFailed(message string, generation int64) { + status.MarkFalse(ConditionPipelineProgressiveUpgradeSucceeded, "Failed", message, generation) +} + func (status *PipelineRolloutStatus) SetUpgradeInProgress(upgradeStrategy UpgradeStrategy) { status.UpgradeInProgress = upgradeStrategy }