Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline No downtime Upgrade #327

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions internal/controller/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (r *PipelineRolloutReconciler) reconcile(
controllerutil.AddFinalizer(pipelineRollout, finalizerName)
}

newPipelineDef, err := r.makePipelineDefinition(ctx, pipelineRollout)
newPipelineDef, err := r.makeRunningPipelineDefinition(ctx, pipelineRollout)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -506,16 +506,15 @@ func (r *PipelineRolloutReconciler) processExistingPipeline(ctx context.Context,

case apiv1.UpgradeStrategyProgressive:
if pipelineNeedsToUpdate {
numaLogger.Error(errors.New("Progressive not supported"), "Progressive not supported")
//TODO (just an idea of what it might look like below...)
// done, err := r.processExistingPipelineWithProgressive(...)
//if err != nil {
// return err
//}
//if done {
// r.unsetInProgressStrategy(namespacedName)
//}
done, err := r.processExistingPipelineWithProgressive(ctx, pipelineRollout, existingPipelineDef)
if err != nil {
return err
}
if done {
r.inProgressStrategyMgr.unsetStrategy(ctx, pipelineRollout)
}
}
// TODO: clean up old pipeline when drained
default:
if pipelineNeedsToUpdate && upgradeStrategyType == apiv1.UpgradeStrategyApply {
if err := updatePipelineSpec(ctx, r.restConfig, newPipelineDef); err != nil {
Expand All @@ -539,7 +538,7 @@ func pipelineObservedGenerationCurrent(generation int64, observedGeneration int6
func (r *PipelineRolloutReconciler) processPipelineStatus(ctx context.Context, pipelineRollout *apiv1.PipelineRollout) error {
numaLogger := logger.FromContext(ctx)

pipelineDef, err := r.makePipelineDefinition(ctx, pipelineRollout)
pipelineDef, err := r.makeRunningPipelineDefinition(ctx, pipelineRollout)
if err != nil {
return err
}
Expand Down Expand Up @@ -705,7 +704,7 @@ func updatePipelineSpec(ctx context.Context, restConfig *rest.Config, obj *kuber
return kubernetes.UpdateCR(ctx, restConfig, obj, "pipelines")
}

func pipelineLabels(pipelineRollout *apiv1.PipelineRollout) (map[string]string, error) {
func pipelineLabels(pipelineRollout *apiv1.PipelineRollout, upgradeState string) (map[string]string, error) {
var pipelineSpec PipelineSpec
labelMapping := map[string]string{
common.LabelKeyISBServiceNameForPipeline: "default",
Expand All @@ -718,7 +717,7 @@ func pipelineLabels(pipelineRollout *apiv1.PipelineRollout) (map[string]string,
}

labelMapping[common.LabelKeyPipelineRolloutForPipeline] = pipelineRollout.Name
labelMapping[common.LabelKeyUpgradeState] = string(common.LabelValueUpgradePromoted)
labelMapping[common.LabelKeyUpgradeState] = upgradeState

return labelMapping, nil
}
Expand All @@ -732,20 +731,24 @@ func (r *PipelineRolloutReconciler) updatePipelineRolloutStatusToFailed(ctx cont
}

// getPipelineName retrieves the name of the current running pipeline managed by the given
// pipelineRollout through the `promoted` label. If no such pipeline exists, then it
// constructs the name by calculating the suffix and appending to the PipelineRollout name.
func (r *PipelineRolloutReconciler) getPipelineName(ctx context.Context, pipelineRollout *apiv1.PipelineRollout) (string, error) {
// pipelineRollout through the `promoted` label. Unless there is non such pipeline exist, then
// construct the name by calculate the suffix and append to the PipelineRollout name.
func (r *PipelineRolloutReconciler) getPipelineName(
ctx context.Context,
pipelineRollout *apiv1.PipelineRollout,
upgradeState string,
) (string, error) {
pipelines, err := kubernetes.ListCR(
ctx, r.restConfig, common.NumaflowAPIGroup, common.NumaflowAPIVersion, "pipelines",
pipelineRollout.Namespace, fmt.Sprintf(
"%s=%s,%s=%s", common.LabelKeyPipelineRolloutForPipeline, pipelineRollout.Name,
common.LabelKeyUpgradeState, common.LabelValueUpgradePromoted,
common.LabelKeyPipelineRolloutForPipeline, upgradeState,
), "")
if err != nil {
return "", err
}
if len(pipelines) > 1 {
return "", fmt.Errorf("there should only be one promoted pipeline")
return "", fmt.Errorf("there should only be one promoted or upgrade in progress pipeline")
} else if len(pipelines) == 0 {
suffixName, err := r.calPipelineNameSuffix(ctx, pipelineRollout)
if err != nil {
Expand All @@ -767,19 +770,34 @@ func (r *PipelineRolloutReconciler) calPipelineNameSuffix(ctx context.Context, p
}
}

return "-" + fmt.Sprint(*pipelineRollout.Status.NameCount), nil
preNameCount := *pipelineRollout.Status.NameCount
*pipelineRollout.Status.NameCount++

return "-" + fmt.Sprint(preNameCount), nil
}

func (r *PipelineRolloutReconciler) makePipelineDefinition(ctx context.Context, pipelineRollout *apiv1.PipelineRollout) (*kubernetes.GenericObject, error) {
labels, err := pipelineLabels(pipelineRollout)
func (r *PipelineRolloutReconciler) makeRunningPipelineDefinition(
ctx context.Context,
pipelineRollout *apiv1.PipelineRollout,
) (*kubernetes.GenericObject, error) {
pipelineName, err := r.getPipelineName(ctx, pipelineRollout, string(common.LabelValueUpgradePromoted))
if err != nil {
return nil, err
}
pipelineName, err := r.getPipelineName(ctx, pipelineRollout)

labels, err := pipelineLabels(pipelineRollout, string(common.LabelValueUpgradePromoted))
if err != nil {
return nil, err
}

return r.makePipelineDefinition(pipelineRollout, pipelineName, labels)
}

func (r *PipelineRolloutReconciler) makePipelineDefinition(
pipelineRollout *apiv1.PipelineRollout,
pipelineName string,
labels map[string]string,
) (*kubernetes.GenericObject, error) {
return &kubernetes.GenericObject{
TypeMeta: metav1.TypeMeta{
Kind: "Pipeline",
Expand Down
122 changes: 119 additions & 3 deletions internal/controller/pipelinerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,24 +589,28 @@
tests := []struct {
name string
jsonInput string
upgradeState string
expectedLabel string
expectError bool
}{
{
name: "Valid Input",
jsonInput: `{"interStepBufferServiceName": "buffer-service"}`,
upgradeState: string(common.LabelValueUpgradePromoted),
expectedLabel: "buffer-service",
expectError: false,
},
{
name: "Missing InterStepBufferServiceName",
jsonInput: `{}`,
upgradeState: string(common.LabelValueUpgradePromoted),
expectedLabel: "default",
expectError: false,
},
{
name: "Invalid JSON",
jsonInput: `{"interStepBufferServiceName": "buffer-service"`,
upgradeState: string(common.LabelValueUpgradeInProgress),
expectedLabel: "",
expectError: true,
},
Expand All @@ -627,7 +631,7 @@
},
}

labels, err := pipelineLabels(pipelineRollout)
labels, err := pipelineLabels(pipelineRollout, tt.upgradeState)
if (err != nil) != tt.expectError {
t.Errorf("pipelineLabels() error = %v, expectError %v", err, tt.expectError)
return
Expand All @@ -641,8 +645,8 @@
t.Errorf("pipelineLabels() = %v, expected %v", common.LabelKeyPipelineRolloutForPipeline, pipelineRolloutName)
}

if labels[common.LabelKeyUpgradeState] != string(common.LabelValueUpgradePromoted) {
t.Errorf("pipelineLabels() = %v, expected %v", common.LabelKeyUpgradeState, string(common.LabelValueUpgradePromoted))
if labels[common.LabelKeyUpgradeState] != tt.upgradeState {
t.Errorf("pipelineLabels() = %v, expected %v", common.LabelKeyUpgradeState, tt.upgradeState)
}
}
})
Expand Down Expand Up @@ -943,3 +947,115 @@
})
}
}

// process an existing pipeline in this test, the user preferred strategy is Progressive
func Test_processExistingPipeline_Progressive(t *testing.T) {
restConfig, numaflowClientSet, numaplaneClient, _, err := commontest.PrepareK8SEnvironment()
assert.Nil(t, err)

config.GetConfigManagerInstance().UpdateUSDEConfig(config.USDEConfig{
DefaultUpgradeStrategy: config.ProgressiveStrategyID,
PipelineSpecExcludedPaths: []string{"watermark", "lifecycle"},
})

ctx := context.Background()

// other tests may call this, but it fails if called more than once
if customMetrics == nil {
customMetrics = metrics.RegisterCustomMetrics()
}

recorder := record.NewFakeRecorder(64)

r := NewPipelineRolloutReconciler(
numaplaneClient,
scheme.Scheme,
restConfig,
customMetrics,
recorder)

Check failure on line 975 in internal/controller/pipelinerollout_controller_test.go

View workflow job for this annotation

GitHub Actions / Lint

too many arguments in call to createDefaultPipeline

testCases := []struct {
name string
newPipelineSpec numaflowv1.PipelineSpec
existingPipelineDef numaflowv1.Pipeline
initialRolloutPhase apiv1.Phase
initialInProgressStrategy *apiv1.UpgradeStrategy
numaflowControllerPauseRequest *bool
isbServicePauseRequest *bool

expectedInProgressStrategy apiv1.UpgradeStrategy
expectedRolloutPhase apiv1.Phase
// require these Conditions to be set (note that in real life, previous reconciliations may have set other Conditions from before which are still present)
expectedPipelineSpecResult func(numaflowv1.PipelineSpec) bool
}{
{
name: "spec difference results in Progressive",
newPipelineSpec: pipelineSpecWithTopologyChange,
existingPipelineDef: *createDefaultPipeline(numaflowv1.PipelinePhaseRunning, true),
initialRolloutPhase: apiv1.PhaseDeployed,
initialInProgressStrategy: nil,
expectedInProgressStrategy: apiv1.UpgradeStrategyProgressive,
expectedRolloutPhase: apiv1.PhasePending,
expectedPipelineSpecResult: func(spec numaflowv1.PipelineSpec) bool {
return reflect.DeepEqual(pipelineWithDesiredPhase(pipelineSpec, numaflowv1.PipelinePhasePaused), spec)
},
},
}

for _, tc := range testCases {

t.Run(tc.name, func(t *testing.T) {

// first delete Pipeline and PipelineRollout in case they already exist, in Kubernetes
_ = numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).Delete(ctx, defaultPipelineName, metav1.DeleteOptions{})

pipelineList, err := numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).List(ctx, metav1.ListOptions{})
assert.NoError(t, err)
assert.Len(t, pipelineList.Items, 0)

rollout := createPipelineRollout(tc.newPipelineSpec)
_ = numaplaneClient.Delete(ctx, rollout)

rollout.Status.Phase = tc.initialRolloutPhase
if tc.initialInProgressStrategy != nil {
rollout.Status.UpgradeInProgress = *tc.initialInProgressStrategy
r.inProgressStrategyMgr.store.setStrategy(k8stypes.NamespacedName{Namespace: defaultNamespace, Name: defaultPipelineRolloutName}, *tc.initialInProgressStrategy)
} else {
rollout.Status.UpgradeInProgress = apiv1.UpgradeStrategyNoOp
r.inProgressStrategyMgr.store.setStrategy(k8stypes.NamespacedName{Namespace: defaultNamespace, Name: defaultPipelineRolloutName}, apiv1.UpgradeStrategyNoOp)
}

// the Reconcile() function does this, so we need to do it before calling reconcile() as well
rollout.Status.Init(rollout.Generation)

err = numaplaneClient.Create(ctx, rollout)
assert.NoError(t, err)

// create the already-existing Pipeline in Kubernetes
// this updates everything but the Status subresource
existingPipelineDef := &tc.existingPipelineDef
existingPipelineDef.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(rollout.GetObjectMeta(), apiv1.PipelineRolloutGroupVersionKind)}
pipeline, err := numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).Create(ctx, existingPipelineDef, metav1.CreateOptions{})
assert.NoError(t, err)
// update Status subresource
pipeline.Status = tc.existingPipelineDef.Status
_, err = numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).UpdateStatus(ctx, pipeline, metav1.UpdateOptions{})
assert.NoError(t, err)

_, err = r.reconcile(context.Background(), rollout, time.Now())
assert.NoError(t, err)

////// check results:
// Check Phase of Rollout:
assert.Equal(t, tc.expectedRolloutPhase, rollout.Status.Phase)
// Check In-Progress Strategy
assert.Equal(t, tc.expectedInProgressStrategy, rollout.Status.UpgradeInProgress)

// Check Pipeline spec
resultPipeline, err := numaflowClientSet.NumaflowV1alpha1().Pipelines(defaultNamespace).Get(ctx, defaultPipelineName, metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, resultPipeline)
assert.True(t, tc.expectedPipelineSpecResult(resultPipeline.Spec), "result spec", fmt.Sprint(resultPipeline.Spec))
})
}
}
Loading
Loading