Skip to content

Commit

Permalink
feat: Allow pipeline to be temporarily marked to allow data loss (#325)
Browse files Browse the repository at this point in the history
Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 authored Oct 7, 2024
1 parent 5543e9e commit 34da501
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 35 deletions.
4 changes: 4 additions & 0 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ const (
// This is useful as a Label to quickly locate all Pipelines of a given PipelineRollout
LabelKeyPipelineRolloutForPipeline = "numaplane.numaproj.io/pipeline-rollout-name"

// LabelKeyAllowDataLoss is the label key on a Pipeline to indicate that PPND strategy can skip the usual pausing required
// this includes both the case of pausing for Pipeline updating as well as for NumaflowController and isbsvc updating
LabelKeyAllowDataLoss = "numaplane.numaproj.io/allow-data-loss"

// LabelKeyUpgradeState is the label key used to identify the upgrade state of a resource that is managed by
// a NumaRollout.
LabelKeyUpgradeState = "numaplane.numaproj.io/upgrade-state"
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/isbservicerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (r *ISBServiceRolloutReconciler) processExistingISBService(ctx context.Cont
switch upgradeStrategy {
case config.PPNDStrategyID:

return processChildObjectWithPPND(ctx, isbServiceRollout, r, isbServiceNeedsUpdating, isbServiceIsUpdating, func() error {
return processChildObjectWithPPND(ctx, r.client, isbServiceRollout, r, isbServiceNeedsUpdating, isbServiceIsUpdating, func() error {
r.recorder.Eventf(isbServiceRollout, corev1.EventTypeNormal, "PipelinesPaused", "All Pipelines have paused for ISBService update")
err = r.updateISBService(ctx, isbServiceRollout, newISBServiceDef)
if err != nil {
Expand Down
37 changes: 25 additions & 12 deletions internal/controller/isbservicerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/client-go/tools/record"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/common"
"github.com/numaproj/numaplane/internal/controller/config"
"github.com/numaproj/numaplane/internal/util"
"github.com/numaproj/numaplane/internal/util/metrics"
Expand Down Expand Up @@ -274,7 +275,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
newISBSvcSpec numaflowv1.InterStepBufferServiceSpec
existingISBSvcDef *numaflowv1.InterStepBufferService
existingStatefulSetDef *appsv1.StatefulSet
existingPipelinePhase numaflowv1.PipelinePhase
existingPipeline *numaflowv1.Pipeline
expectedRolloutPhase apiv1.Phase
// require these Conditions to be set (note that in real life, previous reconciliations may have set other Conditions from before which are still present)
expectedConditionsSet map[apiv1.ConditionType]metav1.ConditionStatus
Expand All @@ -285,7 +286,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
existingISBSvcDef: nil,
existingStatefulSetDef: nil,
existingPipelinePhase: numaflowv1.PipelinePhaseRunning,
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning, map[string]string{}),
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue,
Expand All @@ -297,7 +298,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
newISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelinePhase: numaflowv1.PipelinePhaseRunning,
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning, map[string]string{}),
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{}, // some Conditions may be set from before, but in any case nothing new to verify
expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
Expand All @@ -307,7 +308,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelinePhase: numaflowv1.PipelinePhaseRunning,
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseRunning, map[string]string{}),
expectedRolloutPhase: apiv1.PhasePending,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{apiv1.ConditionPausingPipelines: metav1.ConditionTrue},
expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.3"),
Expand All @@ -317,7 +318,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelinePhase: numaflowv1.PipelinePhasePaused,
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}),
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionPausingPipelines: metav1.ConditionTrue,
Expand All @@ -330,7 +331,19 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipelinePhase: numaflowv1.PipelinePhaseFailed,
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhaseFailed, map[string]string{}),
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue,
},
expectedISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
},
{
name: "existing ISBService - new spec - pipelines set to allow data loss",
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.3", numaflowv1.ISBSvcPhasePending, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", true),
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePausing, map[string]string{common.LabelKeyAllowDataLoss: "true"}),
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionChildResourceDeployed: metav1.ConditionTrue,
Expand All @@ -342,7 +355,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, false),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.3", false),
existingPipelinePhase: numaflowv1.PipelinePhasePaused,
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}),
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionPausingPipelines: metav1.ConditionTrue,
Expand All @@ -354,7 +367,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
newISBSvcSpec: createDefaultISBServiceSpec("2.10.11"),
existingISBSvcDef: createDefaultISBService("2.10.11", numaflowv1.ISBSvcPhaseRunning, true),
existingStatefulSetDef: createDefaultISBStatefulSet("2.10.11", true),
existingPipelinePhase: numaflowv1.PipelinePhasePaused,
existingPipeline: createDefaultPipelineOfPhase(numaflowv1.PipelinePhasePaused, map[string]string{}),
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedConditionsSet: map[apiv1.ConditionType]metav1.ConditionStatus{
apiv1.ConditionPausingPipelines: metav1.ConditionFalse,
Expand Down Expand Up @@ -409,10 +422,9 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {
}

// create the Pipeline beforehand in Kubernetes, this updates everything but the Status subresource
newPipeline := createDefaultPipelineOfPhase(tc.existingPipelinePhase)
pipeline, err := numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).Create(ctx, newPipeline, metav1.CreateOptions{})
pipeline, err := numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).Create(ctx, tc.existingPipeline, metav1.CreateOptions{})
assert.NoError(t, err)
pipeline.Status = newPipeline.Status
pipeline.Status = tc.existingPipeline.Status

// updating the Status subresource is a separate operation
_, err = numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).UpdateStatus(ctx, pipeline, metav1.UpdateOptions{})
Expand Down Expand Up @@ -546,7 +558,7 @@ func createISBServiceRollout(isbsvcSpec numaflowv1.InterStepBufferServiceSpec) *
}
}

func createDefaultPipelineOfPhase(phase numaflowv1.PipelinePhase) *numaflowv1.Pipeline {
func createDefaultPipelineOfPhase(phase numaflowv1.PipelinePhase, annotations map[string]string) *numaflowv1.Pipeline {
return &numaflowv1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerollout-test",
Expand All @@ -555,6 +567,7 @@ func createDefaultPipelineOfPhase(phase numaflowv1.PipelinePhase) *numaflowv1.Pi
CreationTimestamp: metav1.NewTime(time.Now()),
Generation: 1,
Labels: map[string]string{"numaplane.numaproj.io/isbsvc-name": "isbservicerollout-test"},
Annotations: annotations,
},
Spec: numaflowv1.PipelineSpec{
InterStepBufferServiceName: defaultISBSvcRolloutName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (r *NumaflowControllerRolloutReconciler) reconcile(
controllerRollout.Status.MarkDeployed(controllerRollout.Generation)
}

needsRequeue, err := processChildObjectWithPPND(ctx, controllerRollout, r, controllerDeploymentNeedsUpdating,
needsRequeue, err := processChildObjectWithPPND(ctx, r.client, controllerRollout, r, controllerDeploymentNeedsUpdating,
controllerDeploymentIsUpdating, func() error {
r.recorder.Eventf(controllerRollout, corev1.EventTypeNormal, "AllPipelinesPaused", "All Pipelines have paused so Numaflow Controller can safely update")
phase, err := r.sync(controllerRollout, namespace, numaLogger)
Expand Down
19 changes: 12 additions & 7 deletions internal/controller/pause_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/numaproj/numaplane/internal/util/kubernetes"
"github.com/numaproj/numaplane/internal/util/logger"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -28,7 +29,7 @@ type PauseRequester interface {
// return:
// - true if needs a requeue
// - error if any (note we'll automatically reuqueue if there's an error anyway)
func processChildObjectWithPPND(ctx context.Context, rollout client.Object, pauseRequester PauseRequester,
func processChildObjectWithPPND(ctx context.Context, k8sclient client.Client, rollout client.Object, pauseRequester PauseRequester,
resourceNeedsUpdating bool, resourceIsUpdating bool, updateFunc func() error) (bool, error) {
numaLogger := logger.FromContext(ctx)

Expand All @@ -49,7 +50,7 @@ func processChildObjectWithPPND(ctx context.Context, rollout client.Object, paus
if !pauseRequestUpdated && resourceNeedsUpdating {

// check if the pipelines are all paused (or can't be paused)
allPaused, err := areAllPipelinesPausedOrUnpausible(ctx, pauseRequester, rolloutNamespace, rolloutName)
allPaused, err := areAllPipelinesPausedOrWontPause(ctx, k8sclient, pauseRequester, rolloutNamespace, rolloutName)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -102,20 +103,24 @@ func requestPipelinesPause(ctx context.Context, pauseRequester PauseRequester, r
}

// check if all Pipelines corresponding to this Rollout have paused or are otherwise not pausible (contract with Numaflow is that this is Pipelines which are "Failed")
func areAllPipelinesPausedOrUnpausible(ctx context.Context, pauseRequester PauseRequester, rolloutNamespace string, rolloutName string) (bool, error) {
// or have an exception for allowing data loss
func areAllPipelinesPausedOrWontPause(ctx context.Context, k8sClient client.Client, pauseRequester PauseRequester, rolloutNamespace string, rolloutName string) (bool, error) {
numaLogger := logger.FromContext(ctx)
pipelines, err := pauseRequester.getPipelineList(ctx, rolloutNamespace, rolloutName)
if err != nil {
return false, err
}
for _, pipeline := range pipelines {
status, err := kubernetes.ParseStatus(pipeline)
if err != nil {

// Get PipelineRollout CR
pipelineRolloutName := getPipelineRolloutName(pipeline.Name)
pipelineRollout := &apiv1.PipelineRollout{}
if err := k8sClient.Get(ctx, k8stypes.NamespacedName{Namespace: rolloutNamespace, Name: pipelineRolloutName}, pipelineRollout); err != nil {
return false, err
}
if status.Phase != "Paused" && status.Phase != "Failed" {
numaLogger.Debugf("pipeline %q has status.phase=%q", pipeline.Name, status.Phase)

if !isPipelinePausedOrWontPause(ctx, pipeline, pipelineRollout) {
numaLogger.Debugf("pipeline %q not paused or won't pause", pipeline.Name)
return false, nil
}
}
Expand Down
29 changes: 25 additions & 4 deletions internal/controller/pipelinerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controller
import (
"context"
"encoding/json"
"fmt"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -649,7 +648,7 @@ func TestPipelineLabels(t *testing.T) {
}
}

func createPipelineRollout(isbsvcSpec numaflowv1.PipelineSpec) *apiv1.PipelineRollout {
func createPipelineRollout(isbsvcSpec numaflowv1.PipelineSpec, annotations map[string]string, labels map[string]string) *apiv1.PipelineRollout {
pipelineRaw, _ := json.Marshal(isbsvcSpec)
return &apiv1.PipelineRollout{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -658,6 +657,8 @@ func createPipelineRollout(isbsvcSpec numaflowv1.PipelineSpec) *apiv1.PipelineRo
UID: "uid",
CreationTimestamp: metav1.NewTime(time.Now()),
Generation: 1,
Annotations: annotations,
Labels: labels,
},
Spec: apiv1.PipelineRolloutSpec{
Pipeline: apiv1.Pipeline{
Expand Down Expand Up @@ -731,6 +732,7 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
testCases := []struct {
name string
newPipelineSpec numaflowv1.PipelineSpec
pipelineRolloutAnnotations map[string]string
existingPipelineDef numaflowv1.Pipeline
initialRolloutPhase apiv1.Phase
initialInProgressStrategy *apiv1.UpgradeStrategy
Expand Down Expand Up @@ -856,6 +858,21 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
return reflect.DeepEqual(pipelineWithDesiredPhase(pipelineSpecWithTopologyChange, numaflowv1.PipelinePhaseRunning), spec)
},
},
{
name: "Pipeline stuck pausing, allow-data-loss annotation applied",
newPipelineSpec: pipelineSpecWithTopologyChange,
pipelineRolloutAnnotations: map[string]string{common.LabelKeyAllowDataLoss: "true"},
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhasePausing),
initialRolloutPhase: apiv1.PhasePending,
initialInProgressStrategy: &ppndUpgradeStrategy,
numaflowControllerPauseRequest: &trueValue,
isbServicePauseRequest: &trueValue,
expectedInProgressStrategy: apiv1.UpgradeStrategyNoOp,
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedPipelineSpecResult: func(spec numaflowv1.PipelineSpec) bool {
return reflect.DeepEqual(pipelineSpecWithTopologyChange, spec)
},
},
}

for _, tc := range testCases {
Expand All @@ -869,7 +886,11 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, pipelineList.Items, 0)

rollout := createPipelineRollout(tc.newPipelineSpec)
if tc.pipelineRolloutAnnotations == nil {
tc.pipelineRolloutAnnotations = map[string]string{}
}

rollout := createPipelineRollout(tc.newPipelineSpec, tc.pipelineRolloutAnnotations, map[string]string{})
_ = numaplaneClient.Delete(ctx, rollout)

rollout.Status.Phase = tc.initialRolloutPhase
Expand Down Expand Up @@ -920,7 +941,7 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
resultPipeline, err := numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).Get(ctx, defaultPipelineName, metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, resultPipeline)
assert.True(t, tc.expectedPipelineSpecResult(resultPipeline.Spec), "result spec", fmt.Sprint(resultPipeline.Spec))
assert.True(t, tc.expectedPipelineSpecResult(resultPipeline.Spec), "result spec", resultPipeline.Spec)
})
}
}
Loading

0 comments on commit 34da501

Please sign in to comment.