diff --git a/control-plane/cmd/kafka-controller/main.go b/control-plane/cmd/kafka-controller/main.go index d39dfce1b9..70a9df64b0 100644 --- a/control-plane/cmd/kafka-controller/main.go +++ b/control-plane/cmd/kafka-controller/main.go @@ -30,6 +30,7 @@ import ( "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/eventingtls" + "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" @@ -67,6 +68,7 @@ func main() { ctx = filteredFactory.WithSelectors(ctx, eventingtls.TrustBundleLabelSelector, auth.OIDCLabelSelector, + eventing.DispatcherLabelSelectorStr, ) ctx = clientpool.WithKafkaClientPool(ctx) diff --git a/control-plane/pkg/apis/internals/kafka/eventing/data_plane.go b/control-plane/pkg/apis/internals/kafka/eventing/data_plane.go index a2e822ceb9..f37c01af6e 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/data_plane.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/data_plane.go @@ -27,6 +27,11 @@ const ( ConfigMapVolumeName = "kafka-resources" DispatcherVolumeName = "contract-resources" + + DataPlanePodKindLabelKey = "app.kubernetes.io/kind" + DispatcherPodKindLabelValue = "kafka-dispatcher" + + DispatcherLabelSelectorStr = DataPlanePodKindLabelKey + "=" + DispatcherPodKindLabelValue ) func ConfigMapNameFromPod(p *corev1.Pod) (string, error) { diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go index 952058f633..5331d772d1 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/register.go @@ -17,6 +17,8 @@ package v1alpha1 import ( + "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -76,13 +78,14 @@ func IsKnownStatefulSet(name string) bool { name == BrokerStatefulSetName } -func GetOwnerKindFromStatefulSetName(name string) (string, bool) { - switch name { - case SourceStatefulSetName: +func GetOwnerKindFromStatefulSetPrefix(name string) (string, bool) { + if strings.HasPrefix(name, SourceStatefulSetName) { return "KafkaSource", true - case ChannelStatefulSetName: + } + if strings.HasPrefix(name, ChannelStatefulSetName) { return "KafkaChannel", true - case BrokerStatefulSetName: + } + if strings.HasPrefix(name, BrokerStatefulSetName) { return "Trigger", true } return "", false diff --git a/control-plane/pkg/reconciler/consumergroup/controller.go b/control-plane/pkg/reconciler/consumergroup/controller.go index 94e22ad04a..ecc96df556 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller.go +++ b/control-plane/pkg/reconciler/consumergroup/controller.go @@ -27,12 +27,14 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/tools/cache" + "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset" "knative.dev/eventing-kafka-broker/control-plane/pkg/prober" @@ -42,7 +44,7 @@ import ( "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap" nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node" - podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" + podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered" secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -119,13 +121,15 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I clientPool := clientpool.Get(ctx) + dispatcherPodInformer := podinformer.Get(ctx, eventing.DispatcherLabelSelectorStr) + r := &Reconciler{ SchedulerFunc: func(s string) (Scheduler, bool) { sched, ok := schedulers[strings.ToLower(s)]; return sched, ok }, ConsumerLister: consumer.Get(ctx).Lister(), InternalsClient: internalsclient.Get(ctx).InternalV1alpha1(), SecretLister: secretinformer.Get(ctx).Lister(), ConfigMapLister: configmapinformer.Get(ctx).Lister(), - PodLister: podinformer.Get(ctx).Lister(), + PodLister: dispatcherPodInformer.Lister(), KubeClient: kubeclient.Get(ctx), NameGenerator: names.SimpleNameGenerator, GetKafkaClient: clientPool.GetClient, @@ -194,6 +198,47 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I return cg, ok }) + dispatcherPodInformer.Informer().AddEventHandler(controller.HandleAll(func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return + } + + kind, ok := kafkainternals.GetOwnerKindFromStatefulSetPrefix(pod.Name) + if !ok { + return + } + + cmName, err := eventing.ConfigMapNameFromPod(pod) + if err != nil { + logger.Warnw("Failed to get ConfigMap name from pod", zap.String("pod", pod.Name), zap.Error(err)) + return + } + if err := r.ensureContractConfigMapExists(ctx, pod, cmName); err != nil { + logger.Warnw("Failed to ensure ConfigMap for pod exists", zap.String("pod", pod.Name), zap.String("configmap", cmName), zap.Error(err)) + return + } + + impl.FilteredGlobalResync( + func(obj interface{}) bool { + cg, ok := obj.(*kafkainternals.ConsumerGroup) + if !ok { + return false + } + + uf := cg.GetUserFacingResourceRef() + if uf == nil { + return false + } + if strings.EqualFold(kind, uf.Kind) { + return true + } + return false + }, + consumerGroupInformer.Informer(), + ) + })) + //Todo: ScaledObject informer when KEDA is installed return impl @@ -209,7 +254,7 @@ func ResyncOnStatefulSetChange(ctx context.Context, filteredResync func(f func(i return } - kind, ok := kafkainternals.GetOwnerKindFromStatefulSetName(ss.GetName()) + kind, ok := kafkainternals.GetOwnerKindFromStatefulSetPrefix(ss.GetName()) if !ok { return } diff --git a/control-plane/pkg/reconciler/consumergroup/controller_test.go b/control-plane/pkg/reconciler/consumergroup/controller_test.go index a56c942215..ef77e02884 100644 --- a/control-plane/pkg/reconciler/consumergroup/controller_test.go +++ b/control-plane/pkg/reconciler/consumergroup/controller_test.go @@ -17,24 +17,30 @@ package consumergroup import ( + "context" "testing" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/sources/v1beta1/kafkasource/fake" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/node/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake" + _ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake" + filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" "knative.dev/pkg/configmap" reconcilertesting "knative.dev/pkg/reconciler/testing" + "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing" + _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/sources/v1beta1/kafkasource/fake" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" + kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumer/fake" _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/informers/eventing/v1alpha1/consumergroup/fake" @@ -55,7 +61,11 @@ const ( ) func TestNewController(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, func(ctx context.Context) context.Context { + return filteredFactory.WithSelectors(ctx, + eventing.DispatcherLabelSelectorStr, + ) + }) ctx, _ = kedaclient.With(ctx) t.Setenv("SYSTEM_NAMESPACE", systemNamespace) diff --git a/control-plane/pkg/reconciler/consumergroup/evictor_test.go b/control-plane/pkg/reconciler/consumergroup/evictor_test.go index 2da86f0ca7..fab9cbc319 100644 --- a/control-plane/pkg/reconciler/consumergroup/evictor_test.go +++ b/control-plane/pkg/reconciler/consumergroup/evictor_test.go @@ -17,6 +17,7 @@ package consumergroup import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -26,21 +27,23 @@ import ( "k8s.io/utils/pointer" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" kubeclient "knative.dev/pkg/client/injection/kube/client/fake" + filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" reconcilertesting "knative.dev/pkg/reconciler/testing" + "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing" kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake" kafkainternalsclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake" ) func TestNewEvictor(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors) require.NotPanics(t, func() { newEvictor(ctx, zap.String("k", "n")) }) } func TestEvictorNilPodNoPanic(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors) var pod *corev1.Pod @@ -74,7 +77,7 @@ func TestEvictorNilPodNoPanic(t *testing.T) { } func TestEvictorEvictSuccess(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"}, @@ -111,7 +114,7 @@ func TestEvictorEvictSuccess(t *testing.T) { } func TestEvictorNoEvictionEmptyPlacement(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"}, @@ -145,7 +148,7 @@ func TestEvictorNoEvictionEmptyPlacement(t *testing.T) { } func TestEvictorNoEviction(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"}, @@ -181,7 +184,7 @@ func TestEvictorNoEviction(t *testing.T) { } func TestEvictorEvictSuccessConsumerGroupSchedulingInProgress(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"}, @@ -212,7 +215,7 @@ func TestEvictorEvictSuccessConsumerGroupSchedulingInProgress(t *testing.T) { } func TestEvictorEvictPodNotFound(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"}, @@ -238,7 +241,7 @@ func TestEvictorEvictPodNotFound(t *testing.T) { require.Nil(t, err) } func TestEvictorEvictConsumerGroupNotFound(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, withFilteredSelectors) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name"}, @@ -263,3 +266,9 @@ func TestEvictorEvictConsumerGroupNotFound(t *testing.T) { require.Nil(t, err) } + +func withFilteredSelectors(ctx context.Context) context.Context { + return filteredFactory.WithSelectors(ctx, + eventing.DispatcherLabelSelectorStr, + ) +} diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/fake/fake.go b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/fake/fake.go new file mode 100644 index 0000000000..529c6de66f --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/fake/fake.go @@ -0,0 +1,52 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package fake + +import ( + context "context" + + filtered "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered" + factoryfiltered "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +var Get = filtered.Get + +func init() { + injection.Fake.RegisterFilteredInformers(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, []controller.Informer) { + untyped := ctx.Value(factoryfiltered.LabelKey{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch labelkey from context.") + } + labelSelectors := untyped.([]string) + infs := []controller.Informer{} + for _, selector := range labelSelectors { + f := factoryfiltered.Get(ctx, selector) + inf := f.Core().V1().Pods() + ctx = context.WithValue(ctx, filtered.Key{Selector: selector}, inf) + infs = append(infs, inf.Informer()) + } + return ctx, infs +} diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/pod.go b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/pod.go new file mode 100644 index 0000000000..bd3fcc898e --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/pod.go @@ -0,0 +1,65 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package filtered + +import ( + context "context" + + v1 "k8s.io/client-go/informers/core/v1" + filtered "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterFilteredInformers(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct { + Selector string +} + +func withInformer(ctx context.Context) (context.Context, []controller.Informer) { + untyped := ctx.Value(filtered.LabelKey{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch labelkey from context.") + } + labelSelectors := untyped.([]string) + infs := []controller.Informer{} + for _, selector := range labelSelectors { + f := filtered.Get(ctx, selector) + inf := f.Core().V1().Pods() + ctx = context.WithValue(ctx, Key{Selector: selector}, inf) + infs = append(infs, inf.Informer()) + } + return ctx, infs +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context, selector string) v1.PodInformer { + untyped := ctx.Value(Key{Selector: selector}) + if untyped == nil { + logging.FromContext(ctx).Panicf( + "Unable to fetch k8s.io/client-go/informers/core/v1.PodInformer with selector %s from context.", selector) + } + return untyped.(v1.PodInformer) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 07a3e192d2..390d93b83f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1357,6 +1357,8 @@ knative.dev/pkg/client/injection/kube/informers/core/v1/node knative.dev/pkg/client/injection/kube/informers/core/v1/node/fake knative.dev/pkg/client/injection/kube/informers/core/v1/pod knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake +knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered +knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered/fake knative.dev/pkg/client/injection/kube/informers/core/v1/secret knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake knative.dev/pkg/client/injection/kube/informers/core/v1/service