diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index 7aee0f6567..3e0a475590 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -124,12 +124,20 @@ func init() { } } +type NoSchedulerFoundError struct{} + +func (NoSchedulerFoundError) Error() string { + return "no scheduler found" +} + +var _ error = NoSchedulerFoundError{} + type Scheduler struct { scheduler.Scheduler SchedulerConfig } -type schedulerFunc func(s string) Scheduler +type schedulerFunc func(s string) (Scheduler, bool) type Reconciler struct { SchedulerFunc schedulerFunc @@ -231,7 +239,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consum cg.Spec.Replicas = pointer.Int32(0) err := r.schedule(ctx, cg) //de-schedule placements - if err != nil { + if err != nil && !errors.Is(err, NoSchedulerFoundError{}) { // return an error to 1. update the status. 2. not clear the finalizer return cg.MarkScheduleConsumerFailed("Deschedule", fmt.Errorf("failed to unschedule consumer group: %w", err)) } @@ -405,7 +413,15 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr startTime := time.Now() defer recordScheduleLatency(ctx, cg, startTime) - statefulSetScheduler := r.SchedulerFunc(cg.GetUserFacingResourceRef().Kind) + resourceRef := cg.GetUserFacingResourceRef() + if resourceRef == nil { + return NoSchedulerFoundError{} + } + + statefulSetScheduler, ok := r.SchedulerFunc(resourceRef.Kind) + if !ok { + return NoSchedulerFoundError{} + } // Ensure Contract configmaps are created before scheduling to avoid having pending pods due to missing // volumes. @@ -419,7 +435,9 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr return cg.MarkScheduleConsumerFailed("Schedule", err) } // Sort placements by pod name. - sort.SliceStable(placements, func(i, j int) bool { return placements[i].PodName < placements[j].PodName }) + sort.SliceStable(placements, func(i, j int) bool { + return placements[i].PodName < placements[j].PodName + }) cg.Status.Placements = placements diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index a4fd15e19f..98589d7f74 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -69,6 +69,7 @@ func (f SchedulerFunc) Schedule(vpod scheduler.VPod) ([]eventingduckv1alpha1.Pla const ( testSchedulerKey = "scheduler" + noTestScheduler = "no-scheduler" systemNamespace = "knative-eventing" finalizerName = "consumergroups.internal.kafka.eventing.knative.dev" @@ -1673,14 +1674,14 @@ func TestReconcileKind(t *testing.T) { store.OnConfigChanged(exampleConfig) r := &Reconciler{ - SchedulerFunc: func(s string) Scheduler { + SchedulerFunc: func(s string) (Scheduler, bool) { ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler) return Scheduler{ Scheduler: ss, SchedulerConfig: SchedulerConfig{ StatefulSetName: kafkainternals.SourceStatefulSetName, }, - } + }, true }, ConsumerLister: listers.GetConsumerLister(), InternalsClient: fakekafkainternalsclient.Get(ctx).InternalV1alpha1(), @@ -1820,11 +1821,11 @@ func TestReconcileKindNoAutoscaler(t *testing.T) { ctx, _ = kedaclient.With(ctx) r := &Reconciler{ - SchedulerFunc: func(s string) Scheduler { + SchedulerFunc: func(s string) (Scheduler, bool) { ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler) return Scheduler{ Scheduler: ss, - } + }, true }, ConsumerLister: listers.GetConsumerLister(), InternalsClient: fakekafkainternalsclient.Get(ctx).InternalV1alpha1(), @@ -2009,6 +2010,86 @@ func TestFinalizeKind(t *testing.T) { }, SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it }, + { + Name: "Finalize normal - with consumers, no valid scheduler", + Objects: []runtime.Object{ + NewSASLSSLSecret(ConsumerGroupNamespace, SecretName), + NewConsumer(1, + ConsumerSpec(NewConsumerSpec( + ConsumerTopics("t1", "t2"), + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerVReplicas(1), + ConsumerPlacement(kafkainternals.PodBind{PodName: "p1", PodNamespace: systemNamespace}), + ConsumerSubscriber(NewSourceSinkReference()), + )), + ), + NewDeletedConsumeGroup( + ConsumerGroupReplicas(1), + ConsumerGroupOwnerRef(SourceAsOwnerReference()), + ConsumerGroupConsumerSpec(NewConsumerSpec( + ConsumerConfigs( + ConsumerBootstrapServersConfig(ChannelBootstrapServers), + ConsumerGroupIdConfig("my.group.id"), + ), + ConsumerAuth(&kafkainternals.Auth{ + NetSpec: &bindings.KafkaNetSpec{ + SASL: bindings.KafkaSASLSpec{ + Enable: true, + User: bindings.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: SecretName, + }, + Key: "user", + }, + }, + Password: bindings.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: SecretName, + }, + Key: "password", + }, + }, + Type: bindings.SecretValueFromSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: SecretName, + }, + Key: "type", + }, + }, + }, + TLS: bindings.KafkaTLSSpec{ + Enable: true, + }, + }, + }), + )), + ), + }, + Key: testKey, + OtherTestData: map[string]interface{}{ + noTestScheduler: true, + }, + WantDeletes: []clientgotesting.DeleteActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: ConsumerGroupNamespace, + Resource: schema.GroupVersionResource{ + Group: kafkainternals.SchemeGroupVersion.Group, + Version: kafkainternals.SchemeGroupVersion.Version, + Resource: "consumers", + }, + }, + Name: fmt.Sprintf("%s-%d", ConsumerNamePrefix, 1), + }, + }, + SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it + }, { Name: "Finalize normal - with consumers and existing placements", Objects: []runtime.Object{ @@ -2230,11 +2311,14 @@ func TestFinalizeKind(t *testing.T) { errorOnDeleteKafkaCG := row.OtherTestData[kafkatesting.ErrorOnDeleteConsumerGroupTestKey] r := &Reconciler{ - SchedulerFunc: func(s string) Scheduler { + SchedulerFunc: func(s string) (Scheduler, bool) { + if noScheduler, ok := row.OtherTestData[noTestScheduler]; ok && noScheduler.(bool) == true { + return Scheduler{}, false + } ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler) return Scheduler{ Scheduler: ss, - } + }, true }, ConsumerLister: listers.GetConsumerLister(), InternalsClient: fakekafkainternalsclient.Get(ctx).InternalV1alpha1(), diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index ccf516828b..c6838cdf56 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -114,7 +114,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I } r := &Reconciler{ - SchedulerFunc: func(s string) Scheduler { return schedulers[strings.ToLower(s)] }, + SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok }, ConsumerLister: consumer.Get(ctx).Lister(), InternalsClient: internalsclient.Get(ctx).InternalV1alpha1(), SecretLister: secretinformer.Get(ctx).Lister(),