Skip to content

Commit

Permalink
Use v2 specific label to avoid matching previous deployment-based dis…
Browse files Browse the repository at this point in the history
…patchers (#4105)

This ensures that our `consumergroup` controller as well as replication
controller only match v2-specific pods and won't try to reconcile
pods that belong to the previous deployment.

Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
knative-prow-robot and pierDipi authored Oct 2, 2024
1 parent ea10ce6 commit d1e6cb4
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 10 deletions.
5 changes: 4 additions & 1 deletion control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,10 @@ func (r *Reconciler) reconcileSecret(ctx context.Context, expectedSecret *corev1
}

func (r *Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error {
selector := labels.SelectorFromSet(map[string]string{"app": scheduler.StatefulSetName})
selector := labels.SelectorFromSet(map[string]string{
"app": scheduler.StatefulSetName,
"app.kubernetes.io/kind": "kafka-dispatcher",
})
pods, err := r.PodLister.
Pods(r.SystemNamespace).
List(selector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ func TestReconcileKind(t *testing.T) {
Name: "Consumers in multiple pods, with pods pending and unknown phase",
Objects: []runtime.Object{
NewService(),
NewDispatcherPod("p1", PodLabel(kafkainternals.SourceStatefulSetName), PodPending()),
NewDispatcherPod("p2", PodLabel(kafkainternals.SourceStatefulSetName)),
NewDispatcherPod("p1", PodLabel("app", kafkainternals.SourceStatefulSetName), DispatcherLabel(), PodPending()),
NewDispatcherPod("p2", PodLabel("app", kafkainternals.SourceStatefulSetName), DispatcherLabel()),
NewConsumerGroup(
ConsumerGroupConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
Expand Down Expand Up @@ -1717,6 +1717,10 @@ func TestReconcileKind(t *testing.T) {

}

func DispatcherLabel() PodOption {
return PodLabel("app.kubernetes.io/kind", "kafka-dispatcher")
}

func TestReconcileKindNoAutoscaler(t *testing.T) {

tt := TableTest{
Expand Down
3 changes: 2 additions & 1 deletion control-plane/pkg/reconciler/testing/objects_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,8 @@ func BrokerDispatcherPod(namespace string, annotations map[string]string) runtim
Namespace: namespace,
Annotations: annotations,
Labels: map[string]string{
"app": base.BrokerDispatcherLabel,
"app": base.BrokerDispatcherLabel,
"app.kubernetes.io/kind": "kafka-dispatcher",
},
},
Status: corev1.PodStatus{
Expand Down
3 changes: 2 additions & 1 deletion control-plane/pkg/reconciler/testing/objects_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ func ChannelDispatcherPod(namespace string, annotations map[string]string) runti
Namespace: namespace,
Annotations: annotations,
Labels: map[string]string{
"app": base.ChannelDispatcherLabel,
"app": base.ChannelDispatcherLabel,
"app.kubernetes.io/kind": "kafka-dispatcher",
},
},
Status: corev1.PodStatus{
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/testing/objects_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,12 +616,12 @@ func NewDispatcherPod(name string, options ...PodOption) *corev1.Pod {
return p
}

func PodLabel(value string) PodOption {
func PodLabel(key, value string) PodOption {
return func(pod *corev1.Pod) {
if pod.Labels == nil {
pod.Labels = make(map[string]string, 2)
}
pod.Labels["app"] = value
pod.Labels[key] = value
}
}

Expand Down
8 changes: 5 additions & 3 deletions control-plane/pkg/reconciler/testing/objects_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"
sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/apis/bindings/v1beta1"
sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
)
Expand Down Expand Up @@ -215,7 +216,8 @@ func SourceDispatcherPod(namespace string, annotations map[string]string) runtim
Namespace: namespace,
Annotations: annotations,
Labels: map[string]string{
"app": base.SourceDispatcherLabel,
"app": base.SourceDispatcherLabel,
"app.kubernetes.io/kind": "kafka-dispatcher",
},
},
Status: corev1.PodStatus{
Expand Down

0 comments on commit d1e6cb4

Please sign in to comment.