Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.14] fix: do not build OIDC config unless enabled (#4021) #4056

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions control-plane/pkg/reconciler/base/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package base
import (
"context"
"fmt"
"strconv"
"time"

"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -66,6 +66,9 @@ const (
// volume generation annotation data plane pods.
VolumeGenerationAnnotationKey = "volumeGeneration"

// config features update time annotation for data plane pods.
ConfigFeaturesUpdatedAnnotationKey = "configFeaturesUpdatedAt"

Protobuf = "protobuf"
Json = "json"
)
Expand Down Expand Up @@ -266,23 +269,31 @@ func (r *Reconciler) UpdateDataPlaneConfigMap(ctx context.Context, contract *con
return nil
}

func (r *Reconciler) UpdateDispatcherPodsAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
func (r *Reconciler) UpdateDispatcherPodsContractGenerationAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
pods, errors := r.PodLister.Pods(r.DataPlaneNamespace).List(r.dispatcherSelector())
if errors != nil {
return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.DataPlaneNamespace, errors)
}
return r.UpdatePodsAnnotation(ctx, logger, "dispatcher", volumeGeneration, pods)
return r.UpdatePodsAnnotation(ctx, logger, "dispatcher", VolumeGenerationAnnotationKey, fmt.Sprint(volumeGeneration), pods)
}

func (r *Reconciler) UpdateReceiverPodsAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
func (r *Reconciler) UpdateReceiverPodsContractGenerationAnnotation(ctx context.Context, logger *zap.Logger, volumeGeneration uint64) error {
pods, errors := r.PodLister.Pods(r.DataPlaneNamespace).List(r.ReceiverSelector())
if errors != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.DataPlaneNamespace, errors)
}
return r.UpdatePodsAnnotation(ctx, logger, "receiver", volumeGeneration, pods)
return r.UpdatePodsAnnotation(ctx, logger, "receiver", VolumeGenerationAnnotationKey, fmt.Sprint(volumeGeneration), pods)
}

func (r *Reconciler) UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx context.Context, logger *zap.Logger) error {
pods, err := r.PodLister.Pods(r.DataPlaneNamespace).List(r.ReceiverSelector())
if err != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %s", r.DataPlaneNamespace, err)
}
return r.UpdatePodsAnnotation(ctx, logger, "receiver", ConfigFeaturesUpdatedAnnotationKey, time.Now().String(), pods)
}

func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logger, component string, volumeGeneration uint64, pods []*corev1.Pod) error {
func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logger, component, annotationKey, annotationValue string, pods []*corev1.Pod) error {

var errors error

Expand All @@ -291,7 +302,7 @@ func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logge
logger.Debug(
"Update "+component+" pod annotation",
zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)),
zap.Uint64("volumeGeneration", volumeGeneration),
zap.String(annotationKey, annotationValue),
)

// do not update cache copy
Expand All @@ -303,15 +314,15 @@ func (r *Reconciler) UpdatePodsAnnotation(ctx context.Context, logger *zap.Logge
}

// Check whether pod's annotation is the expected one.
if v, ok := annotations[VolumeGenerationAnnotationKey]; ok {
v, err := strconv.ParseUint(v /* base */, 10 /* bitSize */, 64)
if err == nil && v == volumeGeneration {
// Volume generation already matches the expected volume generation number.
if v, ok := annotations[annotationKey]; ok {
if v == annotationValue {
logger.Debug(component + " pod annotation already up to date")
// annotation is already correct.
continue
}
}

annotations[VolumeGenerationAnnotationKey] = fmt.Sprint(volumeGeneration)
annotations[annotationKey] = annotationValue
pod.SetAnnotations(annotations)

if _, err := r.KubeClient.CoreV1().Pods(pod.Namespace).Update(ctx, pod, metav1.UpdateOptions{}); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/base/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestUpdateReceiverPodAnnotation(t *testing.T) {
ReceiverLabel: base.SinkReceiverLabel,
}

err := r.UpdateReceiverPodsAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
require.Nil(t, err)
}

Expand All @@ -247,7 +247,7 @@ func TestUpdateDispatcherPodAnnotation(t *testing.T) {
DispatcherLabel: label,
}

err := r.UpdateDispatcherPodsAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logging.FromContext(ctx).Desugar(), 1)
require.Nil(t, err)
}

Expand Down
8 changes: 4 additions & 4 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand All @@ -226,7 +226,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
logger.Debug("Updated receiver pod annotation")

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Broker
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -505,11 +505,11 @@ func (r *Reconciler) deleteResourceFromContractConfigMap(ctx context.Context, lo
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}
// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
if globalResync != nil {
globalResync(obj)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))
}
})
featureStore.WatchConfigs(watcher)

Expand Down
8 changes: 4 additions & 4 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand All @@ -276,7 +276,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
logger.Debug("Updated receiver pod annotation")

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Channel
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -417,11 +417,11 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}
// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
20 changes: 12 additions & 8 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf

logger := logging.FromContext(ctx)

var globalResync func(obj interface{})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
})
featureStore.WatchConfigs(watcher)

_, err := reconciler.GetOrCreateDataPlaneConfigMap(ctx)
if err != nil {
logger.Fatal("Failed to get or create data plane config map",
Expand All @@ -105,6 +97,18 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
)
}

var globalResync func(obj interface{})
featureStore := feature.NewStore(logger.Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))
}
})
featureStore.WatchConfigs(watcher)

impl := kafkachannelreconciler.NewImpl(ctx, reconciler,
func(impl *controller.Impl) controller.Options {
return controller.Options{
Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/channel/v2/channelv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
Expand Down Expand Up @@ -416,7 +416,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (r *Reconciler) schedule(ctx context.Context, logger *zap.Logger, c *kafkai
}
}

return true, b.UpdatePodsAnnotation(ctx, logger, "dispatcher" /* component, for logging */, ct.Generation, []*corev1.Pod{p})
return true, b.UpdatePodsAnnotation(ctx, logger, "dispatcher" /* component, for logging */, base.VolumeGenerationAnnotationKey, fmt.Sprint(ct.Generation), []*corev1.Pod{p})
}

func (r *Reconciler) commonReconciler(p *corev1.Pod, cmName string) base.Reconciler {
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
if globalResync != nil {
globalResync(obj)
}
err = reconciler.UpdateReceiverConfigFeaturesUpdatedAnnotation(ctx, logger.Desugar())
if err != nil {
logger.Warn("config-features updated, but the receiver pods were not successfully annotated. This may lead to features not working as expected.", zap.Error(err))
}
})
featureStore.WatchConfigs(watcher)

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
// receivers haven't got the Sink, so update failures to receiver pods is a hard failure.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down Expand Up @@ -347,7 +347,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, ks *eventing.KafkaSink) e
// Note: if there aren't changes to be done at the pod annotation level, we just skip the update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions control-plane/pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigge
}

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Trigger
// ready. So, log out the error and move on to the next step.
Expand Down Expand Up @@ -299,7 +299,7 @@ func (r *Reconciler) finalizeKind(ctx context.Context, trigger *eventing.Trigger
logger.Debug("Updated data plane config map", zap.String("configmap", r.Env.DataPlaneConfigMapAsString()))

// Update volume generation annotation of dispatcher pods
if err := r.UpdateDispatcherPodsAnnotation(ctx, logger, ct.Generation); err != nil {
if err := r.UpdateDispatcherPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
// Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds.
// The delete trigger will eventually be seen by the data plane pods, so log out the error and move on to the
// next step.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class FileWatcher implements AutoCloseable {
/**
* All args constructor.
*
* @param contractConsumer updates receiver.
* @param triggerFunction is triggered whenever there is a file change.
* @param file file to watch
*/
public FileWatcher(File file, Runnable triggerFunction) {
Expand Down
Loading
Loading