Skip to content

Commit

Permalink
Merge branch 'main' into usde-isb
Browse files Browse the repository at this point in the history
  • Loading branch information
dpadhiar authored Oct 2, 2024
2 parents 146dd36 + bd935fd commit df198ad
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 92 deletions.
9 changes: 0 additions & 9 deletions internal/controller/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,15 +822,6 @@ func getPipelineChildResourceHealth(conditions []metav1.Condition) (metav1.Condi
return "True", ""
}

func checkChildResources(conditions []metav1.Condition, f func(metav1.Condition) bool) (bool, metav1.Condition) {
for _, cond := range conditions {
if f(cond) {
return true, cond
}
}
return false, metav1.Condition{}
}

func (r *PipelineRolloutReconciler) ErrorHandler(pipelineRollout *apiv1.PipelineRollout, err error, reason, msg string) {
r.customMetrics.PipelinesSyncFailed.WithLabelValues().Inc()
r.recorder.Eventf(pipelineRollout, corev1.EventTypeWarning, reason, msg+" %v", err.Error())
Expand Down
43 changes: 12 additions & 31 deletions internal/controller/pipelinerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,15 +669,10 @@ func createPipelineRollout(isbsvcSpec numaflowv1.PipelineSpec) *apiv1.PipelineRo
}
}

func createPipelineOfSpec(spec numaflowv1.PipelineSpec, phase numaflowv1.PipelinePhase, fullyReconciled bool) *numaflowv1.Pipeline {
func createPipelineOfSpec(spec numaflowv1.PipelineSpec, phase numaflowv1.PipelinePhase) *numaflowv1.Pipeline {
status := numaflowv1.PipelineStatus{
Phase: phase,
}
if fullyReconciled {
status.ObservedGeneration = 1
} else {
status.ObservedGeneration = 0
}
return &numaflowv1.Pipeline{
TypeMeta: metav1.TypeMeta{
Kind: "Pipeline",
Expand All @@ -693,8 +688,8 @@ func createPipelineOfSpec(spec numaflowv1.PipelineSpec, phase numaflowv1.Pipelin

}

func createDefaultPipeline(phase numaflowv1.PipelinePhase, fullyReconciled bool) *numaflowv1.Pipeline {
return createPipelineOfSpec(pipelineSpec, phase, fullyReconciled)
func createDefaultPipeline(phase numaflowv1.PipelinePhase) *numaflowv1.Pipeline {
return createPipelineOfSpec(pipelineSpec, phase)
}

func pipelineWithDesiredPhase(spec numaflowv1.PipelineSpec, phase numaflowv1.PipelinePhase) numaflowv1.PipelineSpec {
Expand Down Expand Up @@ -750,7 +745,7 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
{
name: "nothing to do",
newPipelineSpec: pipelineSpec,
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning, true),
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning),
initialRolloutPhase: apiv1.PhaseDeployed,
initialInProgressStrategy: nil,
numaflowControllerPauseRequest: &falseValue,
Expand All @@ -764,7 +759,7 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
{
name: "direct apply",
newPipelineSpec: pipelineSpecWithWatermarkDisabled,
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning, true),
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning),
initialRolloutPhase: apiv1.PhaseDeployed,
initialInProgressStrategy: nil,
numaflowControllerPauseRequest: &falseValue,
Expand All @@ -778,7 +773,7 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
{
name: "spec difference results in PPND",
newPipelineSpec: pipelineSpecWithTopologyChange,
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning, true),
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning),
initialRolloutPhase: apiv1.PhaseDeployed,
initialInProgressStrategy: nil,
numaflowControllerPauseRequest: &falseValue,
Expand All @@ -792,7 +787,7 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
{
name: "external pause request at the same time as a DirectApply change",
newPipelineSpec: pipelineSpecWithWatermarkDisabled,
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning, true),
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning),
initialRolloutPhase: apiv1.PhaseDeployed,
initialInProgressStrategy: nil,
numaflowControllerPauseRequest: &trueValue,
Expand All @@ -806,7 +801,7 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
{
name: "user sets desiredPhase=Paused",
newPipelineSpec: pipelineWithDesiredPhase(pipelineSpec, numaflowv1.PipelinePhasePaused),
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning, true),
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning),
initialRolloutPhase: apiv1.PhaseDeployed,
initialInProgressStrategy: nil,
numaflowControllerPauseRequest: &falseValue,
Expand All @@ -822,7 +817,7 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
newPipelineSpec: pipelineWithDesiredPhase(pipelineSpec, numaflowv1.PipelinePhaseRunning),
existingPipelineDef: *createPipelineOfSpec(
pipelineWithDesiredPhase(pipelineSpec, numaflowv1.PipelinePhaseRunning),
numaflowv1.PipelinePhasePaused, true),
numaflowv1.PipelinePhasePaused),
initialRolloutPhase: apiv1.PhaseDeployed,
initialInProgressStrategy: nil,
numaflowControllerPauseRequest: &falseValue,
Expand All @@ -836,7 +831,7 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
{
name: "PPND in progress, spec not yet applied, pipeline not paused",
newPipelineSpec: pipelineSpecWithTopologyChange,
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning, true),
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning),
initialRolloutPhase: apiv1.PhasePending,
initialInProgressStrategy: &ppndUpgradeStrategy,
numaflowControllerPauseRequest: &falseValue,
Expand All @@ -848,23 +843,9 @@ func Test_processExistingPipeline_PPND(t *testing.T) {
},
},
{
name: "PPND in progress, spec applied, still being reconciled",
newPipelineSpec: pipelineSpecWithTopologyChange,
existingPipelineDef: *createPipelineOfSpec(pipelineWithDesiredPhase(pipelineSpecWithTopologyChange, numaflowv1.PipelinePhasePaused), numaflowv1.PipelinePhasePaused, false),
initialRolloutPhase: apiv1.PhaseDeployed,
initialInProgressStrategy: &ppndUpgradeStrategy,
numaflowControllerPauseRequest: &falseValue,
isbServicePauseRequest: &falseValue,
expectedInProgressStrategy: apiv1.UpgradeStrategyPPND,
expectedRolloutPhase: apiv1.PhaseDeployed,
expectedPipelineSpecResult: func(spec numaflowv1.PipelineSpec) bool {
return reflect.DeepEqual(pipelineWithDesiredPhase(pipelineSpecWithTopologyChange, numaflowv1.PipelinePhasePaused), spec)
},
},
{
name: "PPND in progress, spec applied, done being reconciled",
name: "PPND in progress, spec applied",
newPipelineSpec: pipelineSpecWithTopologyChange,
existingPipelineDef: *createPipelineOfSpec(pipelineWithDesiredPhase(pipelineSpecWithTopologyChange, numaflowv1.PipelinePhasePaused), numaflowv1.PipelinePhasePaused, true),
existingPipelineDef: *createPipelineOfSpec(pipelineWithDesiredPhase(pipelineSpecWithTopologyChange, numaflowv1.PipelinePhasePaused), numaflowv1.PipelinePhasePaused),
initialRolloutPhase: apiv1.PhaseDeployed,
initialInProgressStrategy: &ppndUpgradeStrategy,
numaflowControllerPauseRequest: &falseValue,
Expand Down
44 changes: 5 additions & 39 deletions internal/controller/pipelinerollout_ppnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@ import (
"github.com/numaproj/numaplane/internal/util/kubernetes"
"github.com/numaproj/numaplane/internal/util/logger"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// TODO: move PPND logic out to its own separate file
// normal sequence of events when we need to pause:
// - set Pipeline's desiredPhase=Paused
// - wait for the desire to Pause to be reconciled completely
// - if we need to update the Pipeline spec:
// - update it
// - wait for the spec update to be reconciled completely
// - wait for Pipeline to become Paused
// - then if we need to update the Pipeline spec, update it
//
// - as long as there's no other requirement to pause, set desiredPhase=Running
// return boolean for whether we can stop the PPND process
Expand Down Expand Up @@ -94,7 +91,6 @@ func (r *PipelineRolloutReconciler) processExistingPipelineWithPPND(ctx context.
// any difference in spec between PipelineRollout and Pipeline, with the exception of lifecycle.desiredPhase field
// any pause request coming from isbsvc or Numaflow Controller
// spec says to pause
// pipeline spec change is still being reconciled
//
// return whether to pause, not to pause, or otherwise unknown
func (r *PipelineRolloutReconciler) shouldBePaused(ctx context.Context, pipelineRollout *apiv1.PipelineRollout, existingPipelineDef, newPipelineDef *kubernetes.GenericObject, pipelineNeedsToUpdate bool) (*bool, error) {
Expand All @@ -109,17 +105,6 @@ func (r *PipelineRolloutReconciler) shouldBePaused(ctx context.Context, pipeline
return nil, fmt.Errorf("failed to convert existing Pipeline spec %q into PipelineSpec type, err=%v", string(existingPipelineDef.Spec.Raw), err)
}

// is the Pipeline currently being reconciled?
pipelineUpdating, err := pipelineIsUpdating(newPipelineDef, existingPipelineDef)
if err != nil {
return nil, err
}

// is the Pipeline currently being reconciled while our desiredPhase==Paused?
// only in this circumstance do we need to make sure to remain Paused until that reconciliation is complete
existingPipelinePauseDesired := existingPipelineSpec.Lifecycle.DesiredPhase == string(numaflowv1.PipelinePhasePaused)
pipelineUpdating = pipelineUpdating && existingPipelinePauseDesired

// Is either Numaflow Controller or ISBService trying to update (such that we need to pause)?
externalPauseRequest, pauseRequestsKnown, err := r.checkForPauseRequest(ctx, pipelineRollout, getISBSvcName(newPipelineSpec))
if err != nil {
Expand All @@ -131,9 +116,9 @@ func (r *PipelineRolloutReconciler) shouldBePaused(ctx context.Context, pipeline

unpausible := checkPipelineStatus(ctx, existingPipelineDef, numaflowv1.PipelinePhaseFailed)

shouldBePaused := (pipelineNeedsToUpdate || pipelineUpdating || externalPauseRequest || specBasedPause) && !unpausible
numaLogger.Debugf("shouldBePaused=%t, pipelineNeedsToUpdate=%t, pipelineUpdating=%t, externalPauseRequest=%t, specBasedPause=%t, unpausible=%t",
shouldBePaused, pipelineNeedsToUpdate, pipelineUpdating, externalPauseRequest, specBasedPause, unpausible)
shouldBePaused := (pipelineNeedsToUpdate || externalPauseRequest || specBasedPause) && !unpausible
numaLogger.Debugf("shouldBePaused=%t, pipelineNeedsToUpdate=%t, externalPauseRequest=%t, specBasedPause=%t, unpausible=%t",
shouldBePaused, pipelineNeedsToUpdate, externalPauseRequest, specBasedPause, unpausible)

// if we have incomplete pause request information (i.e. numaflowcontrollerrollout or isbservicerollout not yet reconciled), don't return
// that it's okay to run
Expand Down Expand Up @@ -237,25 +222,6 @@ func (r *PipelineRolloutReconciler) setPipelineLifecycle(ctx context.Context, pa
return nil
}

// return true if Pipeline (or its children) is still in the process of being reconciled
func pipelineIsUpdating(newPipelineDef *kubernetes.GenericObject, existingPipelineDef *kubernetes.GenericObject) (bool, error) {
existingPipelineStatus, err := kubernetes.ParseStatus(existingPipelineDef)
if err != nil {
return false, err
}
// if Pipeline's ObservedGeneration is old, then Numaflow Controller hasn't even seen the generation change yet
if !pipelineObservedGenerationCurrent(newPipelineDef.Generation, existingPipelineStatus.ObservedGeneration) {
return true, nil
}

// note if Pipeline's children are still being updated
unhealthyOrProgressing, _ := checkChildResources(existingPipelineStatus.Conditions, func(c metav1.Condition) bool {
return c.Status == metav1.ConditionFalse
})

return unhealthyOrProgressing, nil

}
func isPipelinePausedOrUnpausible(ctx context.Context, pipeline *kubernetes.GenericObject) bool {
// contract with Numaflow is that unpausible Pipelines are "Failed" pipelines
return checkPipelineStatus(ctx, pipeline, numaflowv1.PipelinePhasePaused) || checkPipelineStatus(ctx, pipeline, numaflowv1.PipelinePhaseFailed)
Expand Down
20 changes: 10 additions & 10 deletions tests/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ var _ = Describe("Functional e2e", Serial, func() {

verifyPipelineRolloutDeployed(pipelineRolloutName)
verifyPipelineRolloutHealthy(pipelineRolloutName)
verifyInProgressStrategy(Namespace, pipelineRolloutName, apiv1.UpgradeStrategyNoOp)
verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp)

verifyPipelineRunning(Namespace, pipelineName, 2)

Expand Down Expand Up @@ -319,7 +319,7 @@ var _ = Describe("Functional e2e", Serial, func() {
verifyPipelineRolloutDeployed(pipelineRolloutName)
verifyPipelineRolloutHealthy(pipelineRolloutName)

verifyInProgressStrategy(Namespace, pipelineRolloutName, apiv1.UpgradeStrategyNoOp)
verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp)

verifyPipelineRunning(Namespace, pipelineName, 2)

Expand All @@ -342,7 +342,7 @@ var _ = Describe("Functional e2e", Serial, func() {
if dataLossPrevention == "true" {

document("Verify that in-progress-strategy gets set to PPND")
verifyInProgressStrategy(Namespace, pipelineRolloutName, apiv1.UpgradeStrategyPPND)
verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyPPND)

verifyPipelinePaused(Namespace, pipelineRolloutName, pipelineName)

Expand All @@ -361,7 +361,7 @@ var _ = Describe("Functional e2e", Serial, func() {
verifyPipelineRolloutDeployed(pipelineRolloutName)
verifyPipelineRolloutHealthy(pipelineRolloutName)

verifyInProgressStrategy(Namespace, pipelineRolloutName, apiv1.UpgradeStrategyNoOp)
verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp)

verifyPipelineRunning(Namespace, pipelineName, 3)

Expand Down Expand Up @@ -401,7 +401,7 @@ var _ = Describe("Functional e2e", Serial, func() {
(retrievedPipelineStatus.Phase == numaflowv1.PipelinePhasePaused || retrievedPipelineStatus.Phase == numaflowv1.PipelinePhasePausing)
}, 1*time.Minute, testPollingInterval).Should(BeTrue())

verifyInProgressStrategy(Namespace, pipelineRolloutName, apiv1.UpgradeStrategyNoOp)
verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp)

verifyPodsRunning(Namespace, 0, getVertexLabelSelector(pipelineName))
})
Expand All @@ -427,7 +427,7 @@ var _ = Describe("Functional e2e", Serial, func() {
verifyPipelineRolloutDeployed(pipelineRolloutName)
verifyPipelineRolloutHealthy(pipelineRolloutName)

verifyInProgressStrategy(Namespace, pipelineRolloutName, apiv1.UpgradeStrategyNoOp)
verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp)
verifyPipelineRunning(Namespace, pipelineName, 3)
})

Expand All @@ -448,7 +448,7 @@ var _ = Describe("Functional e2e", Serial, func() {
if dataLossPrevention == "true" {

document("Verify that in-progress-strategy gets set to PPND")
verifyInProgressStrategy(Namespace, pipelineRolloutName, apiv1.UpgradeStrategyPPND)
verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyPPND)
verifyPipelinePaused(Namespace, pipelineRolloutName, pipelineName)

Eventually(func() bool {
Expand All @@ -473,7 +473,7 @@ var _ = Describe("Functional e2e", Serial, func() {

verifyNumaflowControllerReady(Namespace)

verifyInProgressStrategy(Namespace, pipelineRolloutName, apiv1.UpgradeStrategyNoOp)
verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp)
verifyPipelineRunning(Namespace, pipelineName, 3)

})
Expand All @@ -496,7 +496,7 @@ var _ = Describe("Functional e2e", Serial, func() {
if dataLossPrevention == "true" {

document("Verify that in-progress-strategy gets set to PPND")
verifyInProgressStrategy(Namespace, pipelineRolloutName, apiv1.UpgradeStrategyPPND)
verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyPPND)
verifyPipelinePaused(Namespace, pipelineRolloutName, pipelineName)

Eventually(func() bool {
Expand All @@ -519,7 +519,7 @@ var _ = Describe("Functional e2e", Serial, func() {

verifyISBSvcReady(Namespace, isbServiceRolloutName, 3)

verifyInProgressStrategy(Namespace, pipelineRolloutName, apiv1.UpgradeStrategyNoOp)
verifyInProgressStrategy(pipelineRolloutName, apiv1.UpgradeStrategyNoOp)
verifyPipelineRunning(Namespace, pipelineName, 3)

})
Expand Down
7 changes: 4 additions & 3 deletions tests/e2e/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,16 @@ func verifyPipelinePaused(namespace string, pipelineRolloutName string, pipeline
}, testTimeout).Should(Equal(metav1.ConditionTrue))

document("Verify that Pipeline is paused and fully drained")
verifyPipelineStatusEventually(Namespace, pipelineName,
verifyPipelineStatusEventually(namespace, pipelineName,
func(retrievedPipelineSpec numaflowv1.PipelineSpec, retrievedPipelineStatus numaflowv1.PipelineStatus) bool {
return retrievedPipelineStatus.Phase == numaflowv1.PipelinePhasePaused && retrievedPipelineStatus.DrainedOnPause

})
verifyPodsRunning(namespace, 0, getVertexLabelSelector(pipelineName))
// this happens too fast to verify it:
//verifyPodsRunning(namespace, 0, getVertexLabelSelector(pipelineName))
}

func verifyInProgressStrategy(namespace string, pipelineRolloutName string, inProgressStrategy apiv1.UpgradeStrategy) {
func verifyInProgressStrategy(pipelineRolloutName string, inProgressStrategy apiv1.UpgradeStrategy) {
document("Verifying InProgressStrategy")
Eventually(func() bool {
rollout, _ := pipelineRolloutClient.Get(ctx, pipelineRolloutName, metav1.GetOptions{})
Expand Down

0 comments on commit df198ad

Please sign in to comment.