Skip to content

Commit

Permalink
[release-1.12] fix selector path for cg scale subresource (#3688)
Browse files Browse the repository at this point in the history
* fix selector path for cg scale subresource

Signed-off-by: Calum Murray <[email protected]>

* set .status.selector properly

Signed-off-by: Calum Murray <[email protected]>

* fix: unit tests expect correct cg status

Signed-off-by: Calum Murray <[email protected]>

* fix: selector is omitted from struct when empty

Co-authored-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>
Co-authored-by: Calum Murray <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
3 people authored Feb 9, 2024
1 parent a3ac3c2 commit c95e06b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ type ConsumerGroupStatus struct {
// same Template, but individual replicas also have a consistent identity.
// +optional
Replicas *int32 `json:"replicas,omitempty"`

// Selector is the string serialized label selector needed for the scale subresource.
// Defaults to ""
Selector string `json:"selector,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ type Reconciler struct {
func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
recordExpectedReplicasMetric(ctx, cg)

r.reconcileStatusSelector(cg)

if err := r.reconcileInitialOffset(ctx, cg); err != nil {
return cg.MarkInitializeOffsetFailed("InitializeOffset", err)
}
Expand Down Expand Up @@ -259,6 +261,10 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consum
return nil
}

func (r *Reconciler) reconcileStatusSelector(cg *kafkainternals.ConsumerGroup) {
cg.Status.Selector = labels.SelectorFromValidatedSet(cg.Spec.Selector).String()
}

func (r *Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions control-plane/pkg/reconciler/consumergroup/consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerGroupReplicas(2),
ConsumerGroupStatusReplicas(0),
ConsumerForTrigger(),
ConsumerGroupStatusSelector(ConsumerLabels),
)
cg.Status.Placements = []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
Expand Down Expand Up @@ -234,6 +235,7 @@ func TestReconcileKind(t *testing.T) {
),
)),
ConsumerGroupReplicas(2),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerGroupStatusReplicas(0),
ConsumerForTrigger(),
)
Expand Down Expand Up @@ -340,6 +342,7 @@ func TestReconcileKind(t *testing.T) {
}),
)),
ConsumerGroupReplicas(2),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerForTrigger(),
)
cg.InitializeConditions()
Expand Down Expand Up @@ -448,6 +451,7 @@ func TestReconcileKind(t *testing.T) {
}),
)),
ConsumerGroupReplicas(2),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerGroupStatusReplicas(0),
ConsumerForTrigger(),
)
Expand Down Expand Up @@ -572,6 +576,7 @@ func TestReconcileKind(t *testing.T) {
}),
)),
ConsumerGroupReplicas(2),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerGroupStatusReplicas(1),
ConsumerForTrigger(),
)
Expand Down Expand Up @@ -758,6 +763,7 @@ func TestReconcileKind(t *testing.T) {
}),
)),
ConsumerGroupReplicas(2),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerForTrigger(),
)
cg.InitializeConditions()
Expand Down Expand Up @@ -974,6 +980,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(2),
ConsumerGroupStatusReplicas(1),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerForTrigger(),
)
cg.Status.Placements = []eventingduckv1alpha1.Placement{
Expand Down Expand Up @@ -1063,6 +1070,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerGroupReplicas(1),
ConsumerGroupStatusReplicas(0),
ConsumerForTrigger(),
ConsumerGroupStatusSelector(ConsumerLabels),
)
cg.Status.Placements = []eventingduckv1alpha1.Placement{
{PodName: "p1", VReplicas: 1},
Expand Down Expand Up @@ -1144,6 +1152,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerGroupIdConfig("my.group.id"),
),
)),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerGroupReplicas(2),
ConsumerGroupStatusReplicas(0),
ConsumerForTrigger(),
Expand Down Expand Up @@ -1230,6 +1239,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerGroupIdConfig("my.group.id"),
),
)),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerGroupReplicas(2),
ConsumerGroupStatusReplicas(1),
ConsumerForTrigger(),
Expand Down Expand Up @@ -1358,6 +1368,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerInitialOffset(sources.OffsetLatest),
)),
)),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerGroupReplicas(3),
ConsumerForTrigger(),
)
Expand Down Expand Up @@ -1468,6 +1479,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerGroupIdConfig("my.group.id"),
),
)),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerGroupReplicas(3),
ConsumerForTrigger(),
)
Expand Down Expand Up @@ -1577,6 +1589,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerGroupIdConfig("my.group.id"),
),
)),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerGroupReplicas(2),
ConsumerForTrigger(),
)
Expand Down Expand Up @@ -1640,6 +1653,7 @@ func TestReconcileKind(t *testing.T) {
),
)),
ConsumerGroupReplicas(2),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerForTrigger(),
)
cg.GetConditionSet().Manage(cg.GetStatus()).InitializeConditions()
Expand Down Expand Up @@ -1776,6 +1790,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {
),
)),
ConsumerGroupReplicas(2),
ConsumerGroupStatusSelector(ConsumerLabels),
ConsumerGroupStatusReplicas(1),
ConsumerForTrigger(),
)
Expand Down
7 changes: 7 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/utils/pointer"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -189,6 +190,12 @@ func ConsumerGroupStatusReplicas(replicas int32) ConsumerGroupOption {
}
}

func ConsumerGroupStatusSelector(label map[string]string) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Status.Selector = labels.SelectorFromSet(label).String()
}
}

func ConsumerGroupReplicasStatus(replicas int32) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Status.Replicas = pointer.Int32(replicas)
Expand Down

0 comments on commit c95e06b

Please sign in to comment.