diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index ec230a3d4d..64cc211085 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -766,7 +766,10 @@ func (r *Reconciler) reconcileSecret(ctx context.Context, expectedSecret *corev1 } func (r *Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error { - selector := labels.SelectorFromSet(map[string]string{"app": scheduler.StatefulSetName}) + selector := labels.SelectorFromSet(map[string]string{ + "app": scheduler.StatefulSetName, + "app.kubernetes.io/kind": "kafka-dispatcher", + }) pods, err := r.PodLister. Pods(r.SystemNamespace). List(selector) diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go index 37985e0706..987e4a9b55 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup_test.go @@ -167,8 +167,8 @@ func TestReconcileKind(t *testing.T) { Name: "Consumers in multiple pods, with pods pending and unknown phase", Objects: []runtime.Object{ NewService(), - NewDispatcherPod("p1", PodLabel(kafkainternals.SourceStatefulSetName), PodPending()), - NewDispatcherPod("p2", PodLabel(kafkainternals.SourceStatefulSetName)), + NewDispatcherPod("p1", PodLabel("app", kafkainternals.SourceStatefulSetName), DispatcherLabel(), PodPending()), + NewDispatcherPod("p2", PodLabel("app", kafkainternals.SourceStatefulSetName), DispatcherLabel()), NewConsumerGroup( ConsumerGroupConsumerSpec(NewConsumerSpec( ConsumerTopics("t1", "t2"), @@ -1717,6 +1717,10 @@ func TestReconcileKind(t *testing.T) { } +func DispatcherLabel() PodOption { + return PodLabel("app.kubernetes.io/kind", "kafka-dispatcher") +} + func TestReconcileKindNoAutoscaler(t *testing.T) { tt := TableTest{ diff --git a/control-plane/pkg/reconciler/testing/objects_broker.go b/control-plane/pkg/reconciler/testing/objects_broker.go index e07e79ce9d..40d85c4903 100644 --- a/control-plane/pkg/reconciler/testing/objects_broker.go +++ b/control-plane/pkg/reconciler/testing/objects_broker.go @@ -424,7 +424,8 @@ func BrokerDispatcherPod(namespace string, annotations map[string]string) runtim Namespace: namespace, Annotations: annotations, Labels: map[string]string{ - "app": base.BrokerDispatcherLabel, + "app": base.BrokerDispatcherLabel, + "app.kubernetes.io/kind": "kafka-dispatcher", }, }, Status: corev1.PodStatus{ diff --git a/control-plane/pkg/reconciler/testing/objects_channel.go b/control-plane/pkg/reconciler/testing/objects_channel.go index 76d3c5850c..34cf1bbe88 100644 --- a/control-plane/pkg/reconciler/testing/objects_channel.go +++ b/control-plane/pkg/reconciler/testing/objects_channel.go @@ -191,7 +191,8 @@ func ChannelDispatcherPod(namespace string, annotations map[string]string) runti Namespace: namespace, Annotations: annotations, Labels: map[string]string{ - "app": base.ChannelDispatcherLabel, + "app": base.ChannelDispatcherLabel, + "app.kubernetes.io/kind": "kafka-dispatcher", }, }, Status: corev1.PodStatus{ diff --git a/control-plane/pkg/reconciler/testing/objects_common.go b/control-plane/pkg/reconciler/testing/objects_common.go index f4afcb121a..827a3294bc 100644 --- a/control-plane/pkg/reconciler/testing/objects_common.go +++ b/control-plane/pkg/reconciler/testing/objects_common.go @@ -616,12 +616,12 @@ func NewDispatcherPod(name string, options ...PodOption) *corev1.Pod { return p } -func PodLabel(value string) PodOption { +func PodLabel(key, value string) PodOption { return func(pod *corev1.Pod) { if pod.Labels == nil { pod.Labels = make(map[string]string, 2) } - pod.Labels["app"] = value + pod.Labels[key] = value } } diff --git a/control-plane/pkg/reconciler/testing/objects_source.go b/control-plane/pkg/reconciler/testing/objects_source.go index af11432cf7..58b3bb8c8d 100644 --- a/control-plane/pkg/reconciler/testing/objects_source.go +++ b/control-plane/pkg/reconciler/testing/objects_source.go @@ -23,14 +23,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/pointer" - "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1" - sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" eventingduck "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1" + sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" ) @@ -215,7 +216,8 @@ func SourceDispatcherPod(namespace string, annotations map[string]string) runtim Namespace: namespace, Annotations: annotations, Labels: map[string]string{ - "app": base.SourceDispatcherLabel, + "app": base.SourceDispatcherLabel, + "app.kubernetes.io/kind": "kafka-dispatcher", }, }, Status: corev1.PodStatus{